利用 Tekton 与 Redis 构建缓存感知的向量嵌入生成流水线


团队的向量模型迭代一直是个痛点。每次对预处理逻辑或模型进行微调,都意味着需要对整个数GB的验证数据集重新生成嵌入向量,这个过程动辄数小时。我们的CI/CD流水线,原本是为了加速交付,现在却成了最主要的瓶颈。问题很明确:大量的计算是重复且不必要的。当输入数据和处理逻辑都没变时,其输出的向量也应该是固定的。

初步的构想自然是引入缓存。但在Tekton这种基于容器的流水线环境中,传统的基于文件系统的缓存策略(如Tekton自带的Workspace PVC)显得笨拙。跨流水线运行(PipelineRun)的缓存共享配置复杂,而且在并行任务中还可能存在读写冲突。我们需要一个更高效、中心化的缓存解决方案。

技术选型决策很快就清晰了:

  1. Tekton: 作为流水线编排器。它Kubernetes原生的特性让我们能精细化地控制每个步骤的资源和环境,这对于资源密集型的ML任务至关重要。
  2. Python: ML任务的实现语言,毋庸置疑。
  3. Redis: 这次选型的关键。我们没有把它仅仅当作一个简单的键值存储。我们选择的是集成了向量搜索能力的Redis Stack。这个决策有两个核心考量:
    • 高速缓存层: 利用Redis的内存读写性能,作为我们向量嵌入的缓存。键(Key)是输入数据的哈希,值(Value)是生成的向量。
    • 即时验证能力: 既然向量已经存入Redis,我们可以利用其内建的向量索引和搜索功能,在流水线内部就对新生成的向量进行一次快速的“冒烟测试”,比如检查某个基准点的最近邻是否符合预期。这能让错误暴露得更早。

我们的目标是构建一个流水线,它能智能地识别出哪些数据片段需要重新计算嵌入,而哪些可以直接从Redis中拉取,从而将空跑(无实质性变更)的流水线时间从小时级压缩到分钟级。

第一步:基线流水线——缓慢但有效

在优化之前,我们得先有一个能跑通的基础版本。这个版本非常直接:一个Tekton Task,接收一个包含原始文本数据的数据集,然后用一个Python脚本逐行处理,生成向量,最后将结果存为一个文件。

这是最初的Task定义,它不包含任何缓存逻辑。

# tekton/tasks/generate-embeddings-v1.yaml
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
  name: generate-embeddings-vocal
spec:
  params:
    - name: source-data-path
      description: The path to the raw text data file within the workspace.
      type: string
      default: "data/raw_corpus.txt"
    - name: output-embeddings-path
      description: The path to store the generated embeddings file.
      type: string
      default: "output/embeddings.json"
    - name: model-name
      description: The sentence-transformer model to use.
      type: string
      default: "all-MiniLM-L6-v2"
  workspaces:
    - name: data
      description: The workspace containing source data and for storing outputs.
  steps:
    - name: process-data
      image: python:3.9-slim
      workingDir: $(workspaces.data.path)
      script: |
        #!/usr/bin/env bash
        set -e
        
        echo "--> Installing dependencies..."
        pip install sentence-transformers torch numpy
        
        echo "--> Running embedding generation script..."
        python3 <<'EOF'
        import os
        import json
        import hashlib
        from sentence_transformers import SentenceTransformer
        
        # --- Configuration ---
        SOURCE_PATH = "$(params.source-data-path)"
        OUTPUT_PATH = "$(params.output-embeddings-path)"
        MODEL_NAME = "$(params.model-name)"
        
        print(f"Loading model: {MODEL_NAME}")
        model = SentenceTransformer(MODEL_NAME)
        
        embeddings_map = {}
        
        print(f"Reading source data from: {SOURCE_PATH}")
        if not os.path.exists(SOURCE_PATH):
            print(f"Error: Source file not found at {SOURCE_PATH}")
            exit(1)
            
        with open(SOURCE_PATH, 'r', encoding='utf-8') as f:
            lines = [line.strip() for line in f if line.strip()]
        
        print(f"Found {len(lines)} lines to process.")
        
        # --- Core Logic ---
        # This is the slow part we want to optimize
        embeddings = model.encode(lines, show_progress_bar=True)
        
        for i, line in enumerate(lines):
            # Use a hash of the content as a unique identifier
            line_hash = hashlib.sha256(line.encode('utf-8')).hexdigest()
            embeddings_map[line_hash] = {
                "text": line,
                "vector": embeddings[i].tolist()
            }
        
        print(f"Generated {len(embeddings_map)} embeddings.")
        
        # --- Save Output ---
        output_dir = os.path.dirname(OUTPUT_PATH)
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
            
        with open(OUTPUT_PATH, 'w', encoding='utf-8') as f:
            json.dump(embeddings_map, f, ensure_ascii=False, indent=2)
            
        print(f"Successfully saved embeddings to {OUTPUT_PATH}")
        EOF

这个Task可以工作,但每次运行PipelineRun,哪怕raw_corpus.txt文件一个字没改,model.encode都会完整地执行一遍,这就是我们要解决的问题。

第二步:引入Redis作为高速缓存层

现在,我们改造这个Task,让它在计算之前先查询Redis。

架构设计:

graph TD
    A[Tekton PipelineRun Start] --> B{Task: generate-embeddings-cached};
    B --> C[Python Script Starts];
    C --> D{For each line in raw_corpus.txt};
    D --> E[Calculate SHA256 Hash of line];
    E --> F{Query Redis with Hash};
    F -- Exists --> G[Append vector from Redis to results];
    F -- Not Exists --> H[Compute vector using model.encode];
    H --> I[Store vector in Redis with Hash as Key];
    I --> J[Append new vector to results];
    G --> K[Next line];
    J --> K;
    K --> D;
    D -- All lines processed --> L[Save final embeddings file];
    L --> M[Task Finish];

为了让Tekton Task能连接到Redis,我们需要通过Kubernetes Secret来安全地传递连接信息。

假设我们已经创建了一个名为redis-connection的Secret:

kubectl create secret generic redis-connection --from-literal=REDIS_URL='redis://:[email protected]:6379'

现在,这是我们的核心组件:带有缓存逻辑的Python脚本和对应的Tekton Task。

缓存感知的Python处理逻辑 (process_with_cache.py):
这份代码更加健壮,包含了配置管理、日志、错误处理和与Redis的交互。

# scripts/process_with_cache.py

import os
import json
import hashlib
import logging
import sys
import numpy as np
import redis
from sentence_transformers import SentenceTransformer

# --- Standardized Logging ---
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    stream=sys.stdout
)

# --- Configuration Loading ---
class Config:
    """Load configuration from environment variables."""
    try:
        REDIS_URL = os.environ['REDIS_URL']
        SOURCE_PATH = os.environ['SOURCE_DATA_PATH']
        OUTPUT_PATH = os.environ['OUTPUT_EMBEDDINGS_PATH']
        MODEL_NAME = os.environ.get('MODEL_NAME', 'all-MiniLM-L6-v2')
        # A prefix to namespace keys in Redis, preventing collisions
        REDIS_KEY_PREFIX = os.environ.get('REDIS_KEY_PREFIX', 'embeddings:v1')
    except KeyError as e:
        logging.error(f"Missing essential environment variable: {e}")
        sys.exit(1)

def get_redis_client(redis_url: str) -> redis.Redis:
    """
    Establish a connection to Redis and verify it.
    Includes robust error handling for production environments.
    """
    try:
        client = redis.Redis.from_url(redis_url, decode_responses=False)
        client.ping()
        logging.info("Successfully connected to Redis.")
        return client
    except redis.exceptions.ConnectionError as e:
        logging.error(f"Failed to connect to Redis at {redis_url}. Error: {e}")
        sys.exit(1)
    except Exception as e:
        logging.error(f"An unexpected error occurred during Redis connection: {e}")
        sys.exit(1)

def generate_cache_key(prefix: str, content: str) -> str:
    """Generate a consistent cache key."""
    content_hash = hashlib.sha256(content.encode('utf-8')).hexdigest()
    return f"{prefix}:{content_hash}"

def main():
    """Main execution logic."""
    config = Config()
    redis_client = get_redis_client(config.REDIS_URL)

    logging.info(f"Loading model: {config.MODEL_NAME}")
    model = SentenceTransformer(config.MODEL_NAME)
    embedding_dim = model.get_sentence_embedding_dimension()

    logging.info(f"Reading source data from: {config.SOURCE_PATH}")
    if not os.path.exists(config.SOURCE_PATH):
        logging.error(f"Source file not found at {config.SOURCE_PATH}")
        sys.exit(1)

    with open(config.SOURCE_PATH, 'r', encoding='utf-8') as f:
        lines = [line.strip() for line in f if line.strip()]

    logging.info(f"Found {len(lines)} unique lines to process.")

    final_embeddings = {}
    lines_to_compute = []
    cache_keys_to_compute = []
    
    # --- Phase 1: Cache Checking ---
    # We batch check Redis for better performance.
    cache_keys = [generate_cache_key(config.REDIS_KEY_PREFIX, line) for line in lines]
    cached_vectors_raw = redis_client.mget(cache_keys)

    for i, raw_vector in enumerate(cached_vectors_raw):
        line = lines[i]
        cache_key = cache_keys[i]
        
        if raw_vector:
            # Vector found in cache
            vector = np.frombuffer(raw_vector, dtype=np.float32).tolist()
            final_embeddings[cache_key.split(':')[-1]] = {
                "text": line,
                "vector": vector
            }
        else:
            # Vector not in cache, add to batch for computation
            lines_to_compute.append(line)
            cache_keys_to_compute.append(cache_key)

    cache_hits = len(lines) - len(lines_to_compute)
    logging.info(f"Cache lookup complete. Hits: {cache_hits}, Misses: {len(lines_to_compute)}")

    # --- Phase 2: Compute for Cache Misses ---
    if lines_to_compute:
        logging.info(f"Computing embeddings for {len(lines_to_compute)} new items...")
        new_embeddings = model.encode(lines_to_compute, show_progress_bar=True)
        
        # --- Phase 3: Update Cache and Final Results ---
        # Use a Redis pipeline for efficient bulk insertion
        pipe = redis_client.pipeline()
        for i, line in enumerate(lines_to_compute):
            vector = new_embeddings[i]
            cache_key = cache_keys_to_compute[i]
            
            # Store vector in a compact binary format
            pipe.set(cache_key, vector.astype(np.float32).tobytes())
            
            final_embeddings[cache_key.split(':')[-1]] = {
                "text": line,
                "vector": vector.tolist()
            }
        
        try:
            pipe.execute()
            logging.info(f"Successfully stored {len(lines_to_compute)} new vectors in Redis cache.")
        except redis.exceptions.RedisError as e:
            logging.error(f"Failed to write to Redis cache: {e}")
            # In a real project, you might want to decide if this is a fatal error.
            # For now, we continue, but the cache won't be updated.
    
    # --- Save Final Output ---
    output_dir = os.path.dirname(config.OUTPUT_PATH)
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
        
    with open(config.OUTPUT_PATH, 'w', encoding='utf-8') as f:
        json.dump(final_embeddings, f, ensure_ascii=False, indent=2)
        
    logging.info(f"Process complete. Total embeddings: {len(final_embeddings)}. Saved to {config.OUTPUT_PATH}")

if __name__ == "__main__":
    main()

更新后的Tekton Task (generate-embeddings-v2.yaml):

# tekton/tasks/generate-embeddings-v2.yaml
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
  name: generate-embeddings-cached
spec:
  params:
    - name: source-data-path
      type: string
      default: "data/raw_corpus.txt"
    - name: output-embeddings-path
      type: string
      default: "output/embeddings.json"
    - name: model-name
      type: string
      default: "all-MiniLM-L6-v2"
    - name: python-script
      type: string
      description: The python script to execute.
  workspaces:
    - name: source
      description: The workspace containing source data, scripts and for storing outputs.
  steps:
    - name: process-data-with-cache
      image: python:3.9-slim
      workingDir: $(workspaces.source.path)
      env:
        - name: REDIS_URL
          valueFrom:
            secretKeyRef:
              name: redis-connection # The secret we created
              key: REDIS_URL
        - name: SOURCE_DATA_PATH
          value: $(params.source-data-path)
        - name: OUTPUT_EMBEDDINGS_PATH
          value: $(params.output-embeddings-path)
        - name: MODEL_NAME
          value: $(params.model-name)
        - name: REDIS_KEY_PREFIX
          value: "embeddings:$(params.model-name)" # Namespace by model
      script: |
        #!/usr/bin/env bash
        set -e
        
        echo "--> Installing dependencies..."
        pip install sentence-transformers torch numpy redis
        
        echo "--> Running embedding generation script with cache..."
        python3 $(params.python-script)

现在,当我们第一次运行流水线时,日志会显示:
Cache lookup complete. Hits: 0, Misses: 10000
Computing embeddings for 10000 new items...

第二次运行时,如果数据没有变化,日志将是:
Cache lookup complete. Hits: 10000, Misses: 0
Process complete. Total embeddings: 10000.

整个任务的执行时间从1小时骤降至不到1分钟(主要是Python环境安装和数据读取时间)。

第三步:在流水线中加入向量验证步骤

仅仅生成向量是不够的,我们还需要确保模型的变更没有引入意想不到的偏差。利用Redis Stack的向量搜索能力,我们可以增加一个Task,对新生成的向量进行快速验证。

验证逻辑:

  1. 我们预先在Redis中存入一组“黄金标准”数据(golden set),包含文本和其对应的、被验证过的向量。
  2. 创建一个向量索引。
  3. 在流水线中,当新的向量生成后,我们选取其中几个关键的测试样本。
  4. 用新生成的向量在Redis中执行KNN(K-Nearest Neighbors)搜索。
  5. 断言搜索返回的Top-1邻居是否是它自己。如果不是,或者相似度低于某个阈值,说明新模型可能产生了偏移,流水线应该失败。

验证任务的Python脚本 (validate_vectors.py):

# scripts/validate_vectors.py
import os
import json
import logging
import sys
import numpy as np
import redis
from redis.commands.search.field import VectorField, TagField
from redis.commands.search.query import Query

# --- Logging setup as before ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stdout)

# --- Config ---
class Config:
    REDIS_URL = os.environ['REDIS_URL']
    # Path to the newly generated embeddings from previous step
    INPUT_EMBEDDINGS_PATH = os.environ['INPUT_EMBEDDINGS_PATH']
    # Path to a small file with text lines to test against
    VALIDATION_SET_PATH = os.environ['VALIDATION_SET_PATH']
    MODEL_NAME = os.environ.get('MODEL_NAME', 'all-MiniLM-L6-v2')
    REDIS_INDEX_NAME = f"idx:embeddings-validation:{MODEL_NAME}"
    SIMILARITY_THRESHOLD = float(os.environ.get('SIMILARITY_THRESHOLD', '0.99'))

def get_redis_client(redis_url: str):
    # Same implementation as before
    try:
        client = redis.Redis.from_url(redis_url, decode_responses=False)
        client.ping()
        logging.info("Successfully connected to Redis for validation.")
        return client
    except Exception as e:
        logging.error(f"Redis connection failed for validation: {e}")
        sys.exit(1)

def main():
    config = Config()
    redis_client = get_redis_client(config.REDIS_URL)
    rs = redis_client.ft(config.REDIS_INDEX_NAME)

    # For this example, we assume the golden set is already indexed.
    # In a real scenario, another Tekton task might be responsible for setting this up.
    
    logging.info(f"Loading embeddings to validate from: {config.INPUT_EMBEDDINGS_PATH}")
    with open(config.INPUT_EMBEDDINGS_PATH, 'r') as f:
        embeddings_map = json.load(f)

    logging.info(f"Loading validation set texts from: {config.VALIDATION_SET_PATH}")
    with open(config.VALIDATION_SET_PATH, 'r') as f:
        validation_texts = [line.strip() for line in f if line.strip()]

    all_passed = True
    for text in validation_texts:
        text_hash = hashlib.sha256(text.encode('utf-8')).hexdigest()
        
        if text_hash not in embeddings_map:
            logging.warning(f"Validation text '{text[:30]}...' not found in generated embeddings. Skipping.")
            continue
            
        vector_to_check = np.array(embeddings_map[text_hash]['vector'], dtype=np.float32)
        
        # Build a KNN query
        q = (
            Query("*=>[KNN 1 @vector $query_vec as score]")
            .sort_by("score")
            .return_fields("text", "score")
            .dialect(2)
        )
        query_params = {"query_vec": vector_to_check.tobytes()}
        
        try:
            results = rs.search(q, query_params)
            
            if not results.docs:
                logging.error(f"Validation failed for '{text[:30]}...': No results returned from KNN search.")
                all_passed = False
                continue

            top_result = results.docs[0]
            # In a perfect world, score is 1. We allow a small tolerance.
            similarity = (2 - float(top_result.score)) / 2 if hasattr(top_result, 'score') else 0

            # The nearest neighbor should be the text itself
            if top_result.text == text and similarity >= config.SIMILARITY_THRESHOLD:
                logging.info(f"Validation PASSED for '{text[:30]}...' (Similarity: {similarity:.4f})")
            else:
                logging.error(
                    f"Validation FAILED for '{text[:30]}...'. "
                    f"Expected self, got '{top_result.text[:30]}...'. "
                    f"Similarity: {similarity:.4f} (Threshold: {config.SIMILARITY_THRESHOLD})"
                )
                all_passed = False
        except Exception as e:
            logging.error(f"Error during Redis search for validation: {e}")
            all_passed = False

    if not all_passed:
        logging.error("One or more validation checks failed.")
        sys.exit(1)
    
    logging.info("All validation checks passed successfully.")

if __name__ == "__main__":
    main()

最终的流水线 (Pipeline 定义)

现在我们将这两个Task串联起来,形成一个完整的Pipeline

# tekton/pipeline.yaml
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
  name: vector-embedding-ci
spec:
  workspaces:
    - name: shared-data
  params:
    - name: git-repo-url
      type: string
    - name: git-revision
      type: string
      default: "main"
      
  tasks:
    - name: fetch-source
      taskRef:
        name: git-clone
      workspaces:
        - name: output
          workspace: shared-data
      params:
        - name: url
          value: $(params.git-repo-url)
        - name: revision
          value: $(params.git-revision)
          
    - name: generate-embeddings
      taskRef:
        name: generate-embeddings-cached
      runAfter: ["fetch-source"]
      workspaces:
        - name: source
          workspace: shared-data
      params:
        - name: source-data-path
          value: "data/raw_corpus.txt"
        - name: output-embeddings-path
          value: "output/embeddings.json"
        - name: python-script
          value: "scripts/process_with_cache.py"
          
    - name: validate-embeddings
      taskRef:
        name: validate-embeddings-task # Assume this task is defined similar to generate-embeddings-cached
      runAfter: ["generate-embeddings"]
      workspaces:
        - name: source
          workspace: shared-data
      params:
        - name: input-embeddings-path
          value: "output/embeddings.json"
        - name: validation-set-path
          value: "data/validation_set.txt"
        - name: python-script
          value: "scripts/validate_vectors.py"

这个方案并非没有局限性。首先,Redis实例的稳定性和容量成为了整个CI流程的关键路径。如果Redis宕机或数据丢失,缓存失效会导致所有任务退化到全量计算模式。其次,我们的缓存键策略基于输入文本的哈希,这意味着它无法感知到模型文件本身的变更。一个更完善的方案应该将模型版本或哈希也作为缓存键的一部分。最后,对于超大规模数据集,一次性加载所有文本到内存中进行哈希计算可能成为瓶颈,未来可以迭代为流式处理,逐块与Redis进行交互,进一步优化内存占用。


  目录