Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions 8 CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## 2.4.9

### Added: opt-in streaming log channel via `--upload-logs`

- New `--upload-logs` flag (default off). When set, each CLI invocation registers a run, reports a per-run status (`in_progress` / `success` / `failure` / `cancelled`), and uploads a transcript of its own log output to the Socket backend for that run, visible in the Socket admin views. The transcript is captured regardless of the local `--enable-debug` state; the existing terminal verbosity is unchanged.
- New `--no-upload-logs` flag (mutually exclusive with `--upload-logs`) explicitly opts the run out of uploading logs, even when an org-level override would otherwise enable it. Use this when you need a guaranteed no-upload guarantee (e.g. legal/consent reasons).
- The Socket backend can also force-enable streaming for specific orgs in the absence of an explicit opt-out. The feature is best-effort — registration or upload failures silently degrade and never block the scan.

## 2.4.8

### Fixed: retry transient full-scan upload failures
Expand Down
2 changes: 1 addition & 1 deletion 2 pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "hatchling.build"

[project]
name = "socketsecurity"
version = "2.4.8"
version = "2.4.9"
requires-python = ">= 3.11"
license = {"file" = "LICENSE"}
dependencies = [
Expand Down
2 changes: 1 addition & 1 deletion 2 socketsecurity/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__author__ = 'socket.dev'
__version__ = '2.4.8'
__version__ = '2.4.9'
USER_AGENT = f'SocketPythonCLI/{__version__}'
24 changes: 24 additions & 0 deletions 24 socketsecurity/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ class CliConfig:
ignore_commit_files: bool = False
disable_blocking: bool = False
disable_ignore: bool = False
# Tri-state log-upload preference: True = --upload-logs, False = --no-upload-logs,
# None = neither (server-side override decides).
upload_logs: Optional[bool] = None
strict_blocking: bool = False
integration_type: IntegrationType = "api"
integration_org_slug: Optional[str] = None
Expand Down Expand Up @@ -282,6 +285,7 @@ def from_args(cls, args_list: Optional[List[str]] = None) -> 'CliConfig':
'ignore_commit_files': args.ignore_commit_files,
'disable_blocking': args.disable_blocking,
'disable_ignore': args.disable_ignore,
'upload_logs': args.upload_logs,
'strict_blocking': args.strict_blocking,
'integration_type': args.integration,
'pending_head': args.pending_head,
Expand Down Expand Up @@ -866,6 +870,26 @@ def create_argument_parser() -> argparse.ArgumentParser:
action="store_true",
help=argparse.SUPPRESS
)
log_upload_group = advanced_group.add_mutually_exclusive_group()
log_upload_group.add_argument(
"--upload-logs",
dest="upload_logs",
action="store_const",
const=True,
help="Upload the CLI's log output to the Socket backend for this run. "
"When set, the CLI registers the run with share_logs=true and streams "
"its log records in 5s batches. Default off. Mutually exclusive with "
"--no-upload-logs."
)
log_upload_group.add_argument(
"--no-upload-logs",
dest="upload_logs",
action="store_const",
const=False,
help="Explicitly opt out of uploading CLI logs to the Socket backend, even "
"when an org-level override would otherwise enable it. Mutually "
"exclusive with --upload-logs."
)
advanced_group.add_argument(
"--strict-blocking",
dest="strict_blocking",
Expand Down
79 changes: 79 additions & 0 deletions 79 socketsecurity/core/cli_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Lifecycle helpers for a CLI run on the Socket backend.

A "run" represents a single CLI invocation. `register_cli_run` opens it and
returns a server-issued `run_id` when streaming is enabled; `finalize_cli_run`
closes it on exit. The run_id keys the rows that `BatchedLogUploader` POSTs to
`/python-cli-runs/<run_id>/logs` during the run so the dashboard can show
what the user saw in their terminal.

Streaming is opt-in via the `share_logs` field on register. The server may
also force-enable streaming for an org regardless of the client's request,
so the CLI always calls register and gates on the response's
`log_streaming_enabled` flag rather than the client's intent.

Both calls are best-effort: failures fall back to no-streaming and never
prevent the scan from running.
"""

import json
import logging
from typing import Optional

from .cli_client import CliClient

log = logging.getLogger("socketcli")


def register_cli_run(
client: CliClient,
client_version: str,
upload_logs: Optional[bool],
) -> Optional[str]:
"""Register a CLI run with the backend.

`upload_logs` is the user's tri-state preference (True / False / None);
it's projected to the wire-format `share_logs` and `decline_logs`
booleans here, at the API boundary.

Best-effort: any failure (network, malformed response, unexpected JSON
shape, etc.) falls back to no-streaming and must never prevent the
scan from running. The single broad except is intentional.
"""
try:
resp = client.request(
path="python-cli-runs",
method="POST",
payload=json.dumps({
"client_version": client_version,
"share_logs": upload_logs is True,
"decline_logs": upload_logs is False,
}),
)
body = resp.json()
if not body.get("log_streaming_enabled"):
log.debug("cli-run register: log streaming not enabled by server")
return None
run_id = body.get("run_id")
if not isinstance(run_id, str) or not run_id:
log.debug(f"cli-run register: enabled but missing run_id in response: {body!r}")
return None
return run_id
except Exception as e:
log.debug(f"cli-run register failed (streaming disabled): {e}")
return None


def finalize_cli_run(
client: CliClient,
run_id: str,
status: str = "success",
report_run_id: Optional[str] = None,
) -> None:
try:
client.request(
path=f"python-cli-runs/{run_id}/finalize",
method="POST",
payload=json.dumps({"status": status, "report_run_id": report_run_id}),
)
except Exception as e:
log.debug(f"cli-run finalize failed (swallowed): {e}")
112 changes: 112 additions & 0 deletions 112 socketsecurity/core/log_uploader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""Buffer the CLI's local log records and POST them in batches to
/python-cli-runs/<run_id>/logs so the dashboard's view of a CLI run
mirrors what the user sees in their terminal.

Behavior:
- daemon thread, 5s flush
- swallow all network errors (debug log only)
- skip empty buffers
- drain on shutdown
- at-most-once semantics (failed batches dropped, not retried)

A thread-local recursion guard prevents the uploader's own request-error
log lines (emitted by `cli_client.py`'s `socketdev` logger) from being
re-enqueued during a flush.
"""

import json
import logging
import threading
from datetime import datetime, timezone
from typing import Optional

from .cli_client import CliClient

log = logging.getLogger(__name__)

_FLUSH_GUARD = threading.local()


def _now_str() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]


class BatchedLogUploader:
def __init__(
self,
client: CliClient,
run_id: str,
flush_interval: float = 5.0,
):
self._client = client
self._run_id = run_id
self._flush_interval = flush_interval
self._buf: list = []
self._lock = threading.Lock()
self._stop = threading.Event()
self._thread: Optional[threading.Thread] = None

def add(self, entry: dict) -> None:
with self._lock:
self._buf.append(entry)

def start(self) -> None:
if self._thread is not None:
return
self._thread = threading.Thread(
target=self._run,
name=f"socket-log-uploader-{self._run_id[:8]}",
daemon=True,
)
self._thread.start()

def stop(self, timeout: float = 2.0) -> None:
if self._thread is not None:
self._stop.set()
self._thread.join(timeout=timeout)
self._thread = None
self._flush()

def _run(self) -> None:
while not self._stop.is_set():
self._flush()
self._stop.wait(self._flush_interval)

def _flush(self) -> None:
with self._lock:
if not self._buf:
return
batch = self._buf
self._buf = []

_FLUSH_GUARD.active = True
try:
self._client.request(
path=f"python-cli-runs/{self._run_id}/logs",
method="POST",
payload=json.dumps({"logs": batch}),
)
except Exception as e:
log.debug(f"log upload failed (swallowed, {len(batch)} entries dropped): {e}")
finally:
_FLUSH_GUARD.active = False


class UploadingLogHandler(logging.Handler):
def __init__(self, uploader: BatchedLogUploader, context: str = "socket-python-cli"):
super().__init__()
self._uploader = uploader
self._context = context

def emit(self, record: logging.LogRecord) -> None:
if getattr(_FLUSH_GUARD, "active", False):
return
Comment thread
BarrensZeppelin marked this conversation as resolved.
try:
self._uploader.add({
"timestamp": _now_str(),
"level": logging.getLevelName(record.levelno),
"message": self.format(record),
"context": self._context,
})
except Exception:
self.handleError(record)
112 changes: 112 additions & 0 deletions 112 socketsecurity/core/streaming.py
Comment thread
BarrensZeppelin marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""Server log streaming pipeline for one CLI run.

`StreamingLogs` is a context manager. On enter it registers a run with the
backend, attaches handlers that route the CLI's own log output through both
the local terminal and a batched uploader, and forces the loggers into DEBUG
so the upload captures everything regardless of local terminal verbosity.
On exit it tears the handlers back down and finalizes the run; the status
sent to finalize is inferred from the exception that closed the `with`
block (success / failure / cancelled).

If registration fails the manager becomes a no-op — nothing is wired up and
__exit__ does nothing.
"""

import logging
from typing import Optional

from .cli_client import CliClient
from .cli_run import finalize_cli_run, register_cli_run
from .log_uploader import BatchedLogUploader, UploadingLogHandler


class StreamingLogs:
def __init__(
self,
*,
client: CliClient,
cli_logger: logging.Logger,
sdk_logger: logging.Logger,
client_version: str,
upload_logs: Optional[bool],
enable_debug: bool,
):
self._client = client
self._loggers = (cli_logger, sdk_logger)
self._client_version = client_version
self._upload_logs = upload_logs
self._enable_debug = enable_debug

self._run_id: Optional[str] = None
self._report_run_id: Optional[str] = None
self._uploader: Optional[BatchedLogUploader] = None
self._upload_handler: Optional[UploadingLogHandler] = None
self._terminal_handler: Optional[logging.StreamHandler] = None
self._saved_levels: tuple = ()
self._saved_propagate: tuple = ()

def set_report_run_id(self, report_run_id: Optional[str]) -> None:
self._report_run_id = report_run_id

def __enter__(self) -> "StreamingLogs":
self._run_id = register_cli_run(
self._client,
client_version=self._client_version,
upload_logs=self._upload_logs,
)
cli_logger = self._loggers[0]
if not self._run_id:
cli_logger.debug("server log streaming not active for this run")
return self

self._uploader = BatchedLogUploader(self._client, self._run_id)
self._uploader.start()
self._upload_handler = UploadingLogHandler(self._uploader, context="socket-python-cli")
self._upload_handler.setFormatter(logging.Formatter("%(message)s"))

self._terminal_handler = logging.StreamHandler()
self._terminal_handler.setLevel(logging.DEBUG if self._enable_debug else logging.INFO)
self._terminal_handler.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))

self._saved_levels = tuple(lg.level for lg in self._loggers)
self._saved_propagate = tuple(lg.propagate for lg in self._loggers)
for lg in self._loggers:
lg.setLevel(logging.DEBUG)
lg.propagate = False
lg.addHandler(self._terminal_handler)
lg.addHandler(self._upload_handler)

cli_logger.debug(f"server log streaming enabled (run_id={self._run_id})")
return self

def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
if self._run_id is None:
return False

status = self._status_for_exit(exc_type, exc_val)
for lg in self._loggers:
lg.removeHandler(self._upload_handler)
self._uploader.stop()
finalize_cli_run(
self._client,
self._run_id,
status=status,
report_run_id=self._report_run_id,
)
for lg in self._loggers:
lg.removeHandler(self._terminal_handler)
for lg, level, propagate in zip(self._loggers, self._saved_levels, self._saved_propagate):
lg.setLevel(level)
lg.propagate = propagate
return False

@staticmethod
def _status_for_exit(exc_type, exc_val) -> str:
if exc_type is None:
return "success"
if issubclass(exc_type, KeyboardInterrupt):
return "cancelled"
# SystemExit with code 0 / None is a clean exit; non-zero codes signal failure.
if issubclass(exc_type, SystemExit) and not getattr(exc_val, "code", None):
return "success"
return "failure"
Loading
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.