addressing some semgrep issues

This commit is contained in:
2026-04-01 19:27:37 -05:00
parent 4b525f4641
commit b342005e17
27 changed files with 776 additions and 60 deletions

View File

@@ -17,17 +17,20 @@ Environment Variables:
import argparse
import json
import os
import re
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
import psycopg2
import psycopg2.extras
from psycopg2 import sql
import yaml
# Default configuration
DEFAULT_DATABASE_URL = "postgresql://postgres:postgres@localhost:5432/attune"
DEFAULT_PACKS_DIR = "./packs"
SCHEMA_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
def generate_label(name: str) -> str:
@@ -64,8 +67,13 @@ class PackLoader:
self.conn.autocommit = False
# Set search_path to use the correct schema
if not SCHEMA_RE.match(self.schema):
raise ValueError(f"Invalid schema name: {self.schema}")
cursor = self.conn.cursor()
cursor.execute(f"SET search_path TO {self.schema}, public")
# nosemgrep: python.sqlalchemy.security.sqlalchemy-execute-raw-query.sqlalchemy-execute-raw-query -- This uses psycopg2.sql.Identifier for safe identifier composition after schema-name validation.
cursor.execute(
sql.SQL("SET search_path TO {}, public").format(sql.Identifier(self.schema))
)
cursor.close()
self.conn.commit()
@@ -81,6 +89,16 @@ class PackLoader:
with open(file_path, "r") as f:
return yaml.safe_load(f)
def resolve_pack_relative_path(self, base_dir: Path, relative_path: str) -> Path:
"""Resolve a pack-owned relative path and reject traversal outside the pack."""
candidate = (base_dir / relative_path).resolve()
pack_root = self.pack_dir.resolve()
if not candidate.is_relative_to(pack_root):
raise ValueError(
f"Resolved path '{candidate}' escapes pack root '{pack_root}'"
)
return candidate
def upsert_pack(self) -> int:
"""Create or update the pack"""
print("\n→ Loading pack metadata...")
@@ -412,7 +430,7 @@ class PackLoader:
The database ID of the workflow_definition row, or None on failure.
"""
actions_dir = self.pack_dir / "actions"
full_path = actions_dir / workflow_file_path
full_path = self.resolve_pack_relative_path(actions_dir, workflow_file_path)
if not full_path.exists():
print(f" ⚠ Workflow file '{workflow_file_path}' not found at {full_path}")
return None