526 lines
27 KiB
Python
526 lines
27 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Tool to get info.json from the Thrift service.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import logging
|
|
import codecs
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
# Note: The CLI entrypoint will configure the root logger.
|
|
# We get our own logger here for namespacing.
|
|
logger = logging.getLogger('get_info_tool')
|
|
|
|
# Import Thrift modules
|
|
# Add project's thrift gen_py path to allow importing 'pangramia'
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
project_root = os.path.abspath(os.path.join(script_dir, '..'))
|
|
sys.path.insert(0, os.path.join(project_root, 'thrift_model', 'gen_py'))
|
|
from thrift.transport import TTransport
|
|
from pangramia.yt.common.ttypes import TokenUpdateMode
|
|
from pangramia.yt.exceptions.ttypes import PBServiceException, PBUserException
|
|
from yt_ops_services.client_utils import get_thrift_client
|
|
from ytops_client.request_params_help import REQUEST_PARAMS_HELP_STRING
|
|
|
|
|
|
def get_video_id(url: str) -> str:
|
|
"""Extracts a YouTube video ID from a URL."""
|
|
# For URLs like https://www.youtube.com/watch?v=VIDEO_ID
|
|
match = re.search(r"v=([0-9A-Za-z_-]{11})", url)
|
|
if match:
|
|
return match.group(1)
|
|
# For URLs like https://youtu.be/VIDEO_ID
|
|
match = re.search(r"youtu\.be\/([0-9A-Za-z_-]{11})", url)
|
|
if match:
|
|
return match.group(1)
|
|
# For plain video IDs
|
|
if re.fullmatch(r'[0-9A-Za-z_-]{11}', url):
|
|
return url
|
|
return "unknown_video_id"
|
|
|
|
|
|
def parse_key_value_params(params_str: str) -> Dict[str, Any]:
|
|
"""Parses a comma-separated string of key=value pairs into a nested dict."""
|
|
params = {}
|
|
if not params_str:
|
|
return params
|
|
for pair in params_str.split(','):
|
|
if '=' not in pair:
|
|
logger.warning(f"Skipping malformed parameter pair: {pair}")
|
|
continue
|
|
key, value_str = pair.split('=', 1)
|
|
keys = key.strip().split('.')
|
|
|
|
# Try to parse value as JSON primitive, otherwise treat as string
|
|
try:
|
|
# Don't parse if it's quoted, treat as string
|
|
if (value_str.startswith('"') and value_str.endswith('"')) or \
|
|
(value_str.startswith("'") and value_str.endswith("'")):
|
|
value = value_str[1:-1]
|
|
else:
|
|
value = json.loads(value_str)
|
|
except json.JSONDecodeError:
|
|
value = value_str
|
|
|
|
d = params
|
|
for k in keys[:-1]:
|
|
if k not in d or not isinstance(d[k], dict):
|
|
d[k] = {}
|
|
d = d[k]
|
|
d[keys[-1]] = value
|
|
return params
|
|
|
|
|
|
def add_get_info_parser(subparsers):
|
|
"""Add the parser for the 'get-info' command."""
|
|
parser = subparsers.add_parser(
|
|
'get-info',
|
|
description='Get info.json from Thrift service',
|
|
formatter_class=argparse.RawTextHelpFormatter,
|
|
help='Get info.json from the Thrift service.'
|
|
)
|
|
parser.add_argument('url', help='YouTube URL or video ID')
|
|
parser.add_argument('--host', default='127.0.0.1', help="Thrift server host. Using 127.0.0.1 avoids harmless connection errors when the local Envoy proxy only listens on IPv4.")
|
|
parser.add_argument('--port', type=int, default=9080, help='Thrift server port')
|
|
parser.add_argument('--auth-host', help='Thrift server host (overrides --host).')
|
|
parser.add_argument('--auth-port', type=int, help='Thrift server port (overrides --port).')
|
|
parser.add_argument('--profile', default='default_profile', help='The profile name (accountId) to use for the request.')
|
|
parser.add_argument('--client', help='''Specific client to use. Overrides server default.
|
|
Available clients:
|
|
web, web_safari, web_embedded, web_music, web_creator, mweb
|
|
android, android_music, android_creator, android_vr
|
|
ios, ios_music, ios_creator
|
|
tv, tv_simply, tv_embedded
|
|
|
|
Append "_camoufox" to any client name (e.g., "web_camoufox") to force
|
|
the browser-based generation strategy.''')
|
|
parser.add_argument('--output', help='Output file path for the info.json. If not provided, prints to stdout.')
|
|
parser.add_argument('--output-auto', action='store_true', help='Automatically generate output filename for info.json and invocation data. Format: DATETIME-CLIENT-VIDEOID-info.json')
|
|
parser.add_argument('--output-auto-url-only', action='store_true', help='Automatically generate output filename for info.json (format: VIDEOID-info.json) and also save a copy to latest-info.json.')
|
|
parser.add_argument('--output-auto-suffix', help='Suffix to add to the filename before "-info.json" when using --output-auto or --output-auto-url-only. E.g., "-cycle1".')
|
|
parser.add_argument('--log-file-auto', action='store_true', help='Automatically generate a log filename and save all script logs to it. Format: VIDEOID-DATETIME.log')
|
|
parser.add_argument('--machine-id', help='Identifier for the client machine. Defaults to hostname.')
|
|
parser.add_argument('--worker-id', help='Identifier for a worker process. Used for naming files with --save-latest.')
|
|
parser.add_argument('--save-latest', action='store_true', help='Save a copy of the info.json to latest-info.json or [worker-id]-latest-info.json. This is implied by --output-auto-url-only.')
|
|
parser.add_argument('--assigned-proxy-url', help='A specific proxy URL to use for the request, overriding the server\'s proxy pool logic.')
|
|
parser.add_argument('--proxy-rename', help='Apply sed-style regex substitution to the assigned proxy URL. Format: s/pattern/replacement/')
|
|
parser.add_argument('--print-proxy', action='store_true', help='Print the proxy used for the request to stderr.')
|
|
parser.add_argument('--verbose', action='store_true', help='Enable verbose output')
|
|
parser.add_argument('--log-return', action='store_true', help='Log the full summary of the thrift response to stderr, including detailed logs.\nThis is a convenience flag that implies --show-prefetch-log, --show-nodejs-log, and --show-ytdlp-log.')
|
|
parser.add_argument('--show-prefetch-log', action='store_true', help='Print the curl pre-fetch log from the server response.')
|
|
parser.add_argument('--show-nodejs-log', action='store_true', help='Print the Node.js debug log from the server response.')
|
|
parser.add_argument('--show-ytdlp-log', action='store_true', help='Print the yt-dlp debug log from the server response.')
|
|
parser.add_argument('--direct', action='store_true', help='Use the direct yt-dlp info.json generation method, bypassing Node.js token generation.')
|
|
parser.add_argument('--print-info-out', action='store_true', help='Print the final info.json to stdout. By default, output is suppressed unless writing to a file.')
|
|
parser.add_argument('--request-params-json', help=REQUEST_PARAMS_HELP_STRING + '\nCan also be a comma-separated string of key=value pairs (e.g., "caching_policy.mode=force_refresh").')
|
|
parser.add_argument('--force-renew', help='Comma-separated list of items to force-renew: cookies, visitor_id, po_token, nsig_cache, info_json, all.')
|
|
parser.add_argument('--lang', help='Language code for the request (e.g., "fr", "ja"). Affects metadata language.')
|
|
parser.add_argument('--timezone', help='Timezone for the request (e.g., "UTC", "America/New_York"). Note: experimental, may not be fully supported.')
|
|
return parser
|
|
|
|
def main_get_info(args):
|
|
"""Main logic for the 'get-info' command."""
|
|
exit_code = 0
|
|
|
|
# Set log level
|
|
if args.verbose:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
if args.log_file_auto:
|
|
video_id = get_video_id(args.url)
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
log_filename = f"{video_id}-{timestamp}.log"
|
|
|
|
# Get root logger to add file handler
|
|
root_logger = logging.getLogger()
|
|
file_handler = logging.FileHandler(log_filename)
|
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
file_handler.setFormatter(formatter)
|
|
root_logger.addHandler(file_handler)
|
|
|
|
logger.info(f"Logging to file: {log_filename}")
|
|
|
|
transport = None
|
|
try:
|
|
# Determine host and port, giving precedence to --auth-* args
|
|
host = args.auth_host or args.host
|
|
port = args.auth_port or args.port
|
|
|
|
# Create Thrift client
|
|
client, transport = get_thrift_client(host, port)
|
|
|
|
# Get token data, which includes the info.json
|
|
if args.direct:
|
|
logger.info(f"Requesting info.json for URL '{args.url}' using DIRECT method.")
|
|
if args.client:
|
|
logger.info(f"Requesting to use specific client(s): {args.client}")
|
|
else:
|
|
logger.info("No specific client requested, server will let yt-dlp decide.")
|
|
token_data = client.getInfoJsonDirect(url=args.url, clients=args.client)
|
|
else:
|
|
logger.info(f"Requesting info.json for URL '{args.url}' using profile '{args.profile}'")
|
|
|
|
# Prepare arguments for the Thrift call
|
|
machine_id = args.machine_id
|
|
if not machine_id:
|
|
import socket
|
|
machine_id = socket.gethostname()
|
|
logger.info(f"No machine ID provided, using hostname: {machine_id}")
|
|
|
|
request_params = {}
|
|
if args.request_params_json:
|
|
try:
|
|
request_params = json.loads(args.request_params_json)
|
|
except json.JSONDecodeError:
|
|
logger.info("Could not parse --request-params-json as JSON, trying as key-value string.")
|
|
request_params = parse_key_value_params(args.request_params_json)
|
|
|
|
if args.force_renew:
|
|
items_to_renew = [item.strip() for item in args.force_renew.split(',')]
|
|
request_params['force_renew'] = items_to_renew
|
|
logger.info(f"Requesting force renew for: {items_to_renew}")
|
|
|
|
if args.lang:
|
|
session_params = request_params.setdefault('session_params', {})
|
|
session_params['lang'] = args.lang
|
|
logger.info(f"Requesting language: {args.lang}")
|
|
|
|
if args.timezone:
|
|
session_params = request_params.setdefault('session_params', {})
|
|
session_params['timeZone'] = args.timezone
|
|
logger.info(f"Requesting timezone: {args.timezone}")
|
|
|
|
if args.verbose:
|
|
# Add verbose flag for yt-dlp on the server
|
|
ytdlp_params = request_params.setdefault('ytdlp_params', {})
|
|
ytdlp_params['verbose'] = True
|
|
logger.info("Verbose mode enabled, requesting verbose yt-dlp logs from server.")
|
|
|
|
thrift_args = {
|
|
'accountId': args.profile,
|
|
'updateType': TokenUpdateMode.AUTO,
|
|
'url': args.url,
|
|
'clients': args.client,
|
|
'machineId': machine_id,
|
|
'airflowLogContext': None,
|
|
'requestParamsJson': json.dumps(request_params) if request_params else None,
|
|
'assignedProxyUrl': args.assigned_proxy_url
|
|
}
|
|
|
|
# Handle proxy renaming
|
|
assigned_proxy = args.assigned_proxy_url
|
|
if assigned_proxy and args.proxy_rename:
|
|
rename_rule = args.proxy_rename.strip("'\"")
|
|
if rename_rule.startswith('s/') and rename_rule.count('/') >= 2:
|
|
try:
|
|
parts = rename_rule.split('/')
|
|
pattern = parts[1]
|
|
replacement = parts[2]
|
|
original_proxy = assigned_proxy
|
|
assigned_proxy = re.sub(pattern, replacement, assigned_proxy)
|
|
logger.info(f"Renamed proxy URL from '{original_proxy}' to '{assigned_proxy}' using rule '{rename_rule}'")
|
|
except re.error as e:
|
|
logger.error(f"Invalid regex in --proxy-rename: {e}")
|
|
return 1
|
|
except IndexError:
|
|
logger.error("Invalid --proxy-rename format. Expected: s/pattern/replacement/")
|
|
return 1
|
|
else:
|
|
logger.error("Invalid --proxy-rename format. Expected: s/pattern/replacement/")
|
|
return 1
|
|
thrift_args['assignedProxyUrl'] = assigned_proxy
|
|
|
|
if args.client:
|
|
logger.info(f"Requesting to use specific client: {args.client}")
|
|
else:
|
|
logger.info("No specific client requested, server will use its default.")
|
|
|
|
token_data = client.getOrRefreshToken(**thrift_args)
|
|
|
|
if args.print_proxy:
|
|
if hasattr(token_data, 'socks') and token_data.socks:
|
|
print(f"Proxy used: {token_data.socks}", file=sys.stderr)
|
|
else:
|
|
print("Proxy information not available in response.", file=sys.stderr)
|
|
|
|
if not token_data or not hasattr(token_data, 'infoJson') or not token_data.infoJson:
|
|
logger.error("Server did not return valid info.json data.")
|
|
if args.verbose:
|
|
logger.debug(f"Received token_data from server: {token_data!r}")
|
|
if not token_data:
|
|
logger.error("Reason: The entire token_data object received from the server is null.")
|
|
elif not hasattr(token_data, 'infoJson'):
|
|
logger.error("Reason: The received token_data object does not have an 'infoJson' attribute.")
|
|
elif not token_data.infoJson:
|
|
logger.error("Reason: The 'infoJson' attribute in the received token_data object is empty or null.")
|
|
|
|
print("Error: Server did not return valid info.json data.", file=sys.stderr)
|
|
return 1
|
|
|
|
info_json_str = token_data.infoJson
|
|
|
|
# On success, print summary info to stderr for visibility.
|
|
# This provides immediate feedback without interfering with piped stdout.
|
|
if hasattr(token_data, 'serverVersionInfo') and token_data.serverVersionInfo:
|
|
# Filter out the default params line as requested
|
|
filtered_info = '\n'.join(
|
|
line for line in token_data.serverVersionInfo.split('\n')
|
|
if 'Default yt-dlp CLI params:' not in line
|
|
)
|
|
print(f"\n--- Server Version Info ---\n{filtered_info}", file=sys.stderr)
|
|
info_json_str = token_data.infoJson
|
|
info_data_for_analysis: Optional[Dict[str, Any]] = None
|
|
try:
|
|
info_data_for_analysis = json.loads(info_json_str)
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass # Will be handled later if info_json is invalid
|
|
|
|
if hasattr(token_data, 'requestSummary') and token_data.requestSummary:
|
|
try:
|
|
summary_data = json.loads(token_data.requestSummary)
|
|
summary_text = summary_data.get('summary', token_data.requestSummary)
|
|
|
|
# --- Client-side summary correction and enhancement ---
|
|
gvs_pot_used = False
|
|
if isinstance(info_data_for_analysis, dict):
|
|
for f in info_data_for_analysis.get('formats', []):
|
|
if 'pot=' in f.get('url', ''):
|
|
gvs_pot_used = True
|
|
break
|
|
|
|
if gvs_pot_used and 'PO Token (GVS): not_fetched' in summary_text:
|
|
summary_text = summary_text.replace(
|
|
'PO Token (GVS): not_fetched',
|
|
'PO Token (GVS): bgutil:http (verified from format URL)'
|
|
)
|
|
|
|
if 'Visitor ID Source: omitted_for_tv_client' in summary_text:
|
|
summary_text = summary_text.replace(
|
|
'Visitor ID Source: omitted_for_tv_client',
|
|
'Visitor ID Source: omitted_for_tv_client (handled internally by yt-dlp)'
|
|
)
|
|
# Add a note that we cannot display it.
|
|
summary_text += "\n - Visitor ID Value: Not exposed by server for TV clients to avoid detection."
|
|
|
|
print(f"\n--- Request Summary ---\n{summary_text}", file=sys.stderr)
|
|
except json.JSONDecodeError:
|
|
# Fallback for old format or non-JSON summary
|
|
print(f"\n--- Request Summary ---\n{token_data.requestSummary}", file=sys.stderr)
|
|
|
|
# Print detailed logs only if explicitly requested
|
|
if hasattr(token_data, 'requestSummary') and token_data.requestSummary:
|
|
try:
|
|
summary_data = json.loads(token_data.requestSummary)
|
|
if args.show_prefetch_log or args.log_return:
|
|
print("\n--- Prefetch Log ---", file=sys.stderr)
|
|
print(summary_data.get('prefetch_log', 'Not available.'), file=sys.stderr)
|
|
if args.show_nodejs_log or args.log_return:
|
|
print("\n--- Node.js Log ---", file=sys.stderr)
|
|
print(summary_data.get('nodejs_log', 'Not available.'), file=sys.stderr)
|
|
if args.show_ytdlp_log or args.log_return:
|
|
print("\n--- yt-dlp Log ---", file=sys.stderr)
|
|
print(summary_data.get('ytdlp_log', 'Not available.'), file=sys.stderr)
|
|
except json.JSONDecodeError:
|
|
pass # Fallback already handled above
|
|
if hasattr(token_data, 'communicationLogPaths') and token_data.communicationLogPaths:
|
|
logger.info("--- Communication Log Paths ---")
|
|
for log_path in token_data.communicationLogPaths:
|
|
logger.info(f" - {log_path}")
|
|
|
|
# Check if the returned info.json is an error report
|
|
try:
|
|
info_data = json.loads(info_json_str)
|
|
if hasattr(token_data, 'socks') and token_data.socks:
|
|
info_data['_proxy_url'] = token_data.socks
|
|
if isinstance(info_data, dict) and 'error' in info_data:
|
|
error_code = info_data.get('errorCode', 'N/A')
|
|
error_message = info_data.get('message', info_data.get('error', 'Unknown error'))
|
|
logger.error(f"Server returned an error in info.json (Code: {error_code}): {error_message}")
|
|
print(f"Error from server (Code: {error_code}): {error_message}", file=sys.stderr)
|
|
# Optionally print the full error JSON
|
|
if args.verbose:
|
|
print(json.dumps(info_data, indent=2), file=sys.stderr)
|
|
exit_code = 1
|
|
except json.JSONDecodeError:
|
|
logger.error(f"Failed to parse info.json from server: {info_json_str[:200]}...")
|
|
print("Error: Failed to parse the info.json response from the server.", file=sys.stderr)
|
|
return 1
|
|
|
|
logger.info(f"Successfully retrieved info.json ({len(info_json_str)} bytes)")
|
|
|
|
# Save to latest-info.json if requested, or if using --output-auto-url-only for convenience
|
|
if args.save_latest or args.output_auto_url_only:
|
|
base_latest_filename = f"{args.worker_id}-latest" if args.worker_id else "latest"
|
|
latest_info_filename = f"{base_latest_filename}-info.json"
|
|
latest_proxy_filename = f"{base_latest_filename}-proxy.txt"
|
|
|
|
try:
|
|
with open(latest_info_filename, 'w', encoding='utf-8') as f:
|
|
json.dump(info_data, f, indent=2)
|
|
logger.info(f"Wrote info.json to {latest_info_filename}")
|
|
print(f"Successfully saved info.json to {latest_info_filename}", file=sys.stderr)
|
|
except IOError as e:
|
|
logger.error(f"Failed to write to {latest_info_filename}: {e}")
|
|
print(f"Error: Failed to write to {latest_info_filename}: {e}", file=sys.stderr)
|
|
|
|
if hasattr(token_data, 'socks') and token_data.socks:
|
|
try:
|
|
with open(latest_proxy_filename, 'w', encoding='utf-8') as f:
|
|
f.write(token_data.socks + '\n')
|
|
logger.info(f"Wrote proxy to {latest_proxy_filename}")
|
|
print(f"Successfully saved proxy to {latest_proxy_filename}", file=sys.stderr)
|
|
except IOError as e:
|
|
logger.error(f"Failed to write to {latest_proxy_filename}: {e}")
|
|
print(f"Error: Failed to write to {latest_proxy_filename}: {e}", file=sys.stderr)
|
|
|
|
# Determine output file path if auto-naming is used
|
|
output_file = args.output
|
|
if args.output_auto or args.output_auto_url_only:
|
|
video_id = get_video_id(args.url)
|
|
suffix = args.output_auto_suffix or ""
|
|
if args.output_auto:
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
client_id = args.client or args.profile
|
|
base_filename = f"{timestamp}-{client_id}-{video_id}{suffix}"
|
|
output_file = f"{base_filename}-info.json"
|
|
|
|
# Save invocation data
|
|
invocation_filename = f"{base_filename}-invocation.json"
|
|
invocation_data = {}
|
|
for attr in ['ytdlpCommand', 'socks', 'jobId', 'url', 'requestSummary', 'communicationLogPaths']:
|
|
if hasattr(token_data, attr):
|
|
value = getattr(token_data, attr)
|
|
if value:
|
|
invocation_data[attr] = value
|
|
|
|
if hasattr(token_data, 'cookiesBlob') and token_data.cookiesBlob:
|
|
invocation_data['cookiesBlob'] = f"present, {len(token_data.cookiesBlob)} bytes"
|
|
else:
|
|
invocation_data['cookiesBlob'] = "not present"
|
|
|
|
try:
|
|
with open(invocation_filename, 'w', encoding='utf-8') as f:
|
|
json.dump(invocation_data, f, indent=2)
|
|
logger.info(f"Wrote invocation data to {invocation_filename}")
|
|
except IOError as e:
|
|
logger.error(f"Failed to write invocation data to {invocation_filename}: {e}")
|
|
|
|
else: # args.output_auto_url_only
|
|
output_file = f"{video_id}{suffix}-info.json"
|
|
|
|
# Write to output file if specified
|
|
if output_file:
|
|
try:
|
|
# Ensure the output directory exists before writing the file
|
|
output_dir = os.path.dirname(output_file)
|
|
if output_dir:
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
# Pretty-print the JSON to the file
|
|
json.dump(info_data, f, indent=2)
|
|
logger.info(f"Wrote info.json to {output_file}")
|
|
# Print success message to stderr to not interfere with stdout piping
|
|
print(f"Successfully saved info.json to {output_file}", file=sys.stderr)
|
|
|
|
# If --output-auto, save invocation data
|
|
if args.output_auto:
|
|
pass # The latest-info.json logic is now handled by --save-latest
|
|
|
|
except IOError as e:
|
|
logger.error(f"Failed to write to output file {output_file}: {e}")
|
|
print(f"Error: Failed to write to output file {output_file}: {e}", file=sys.stderr)
|
|
return 1
|
|
|
|
# Print the JSON to stdout if requested, to allow for piping.
|
|
if args.print_info_out:
|
|
print(json.dumps(info_data, indent=2))
|
|
|
|
return exit_code
|
|
except (PBServiceException, PBUserException) as e:
|
|
# Check for non-fatal age-gate errors. These are expected for certain videos
|
|
# and should not cause the entire stress test to fail.
|
|
is_age_gate_error = hasattr(e, 'errorCode') and e.errorCode == 'AGE_GATED_SIGN_IN'
|
|
|
|
if is_age_gate_error:
|
|
logger.warning(f"Age-gated content detected for URL '{args.url}'. Treating as a non-fatal warning.")
|
|
print(f"Warning: Age-gated content detected for '{args.url}'.", file=sys.stderr)
|
|
|
|
# To avoid breaking downstream parsers, output a valid JSON error object.
|
|
# This allows stress testers to see a 'success' (exit 0) but still know it was an age gate issue.
|
|
error_json = {
|
|
"error": "Age-gated content",
|
|
"errorCode": "AGE_GATE",
|
|
"message": "Sign in to confirm your age."
|
|
}
|
|
print(json.dumps(error_json, indent=2))
|
|
|
|
# We return success because this is not a system failure.
|
|
return 0
|
|
|
|
# Format message for better readability, ensuring newlines are handled.
|
|
message = str(e.message or '')
|
|
try:
|
|
# Attempt to decode as if it has escaped newlines (e.g., '\\n' -> '\n')
|
|
message = codecs.decode(message, 'unicode_escape')
|
|
except Exception:
|
|
# Fallback for safety, though unicode_escape is robust
|
|
message = message.replace('\\n', '\n')
|
|
|
|
# For known user-facing errors, suppress the full traceback unless verbose is explicitly on.
|
|
# The goal is to provide a clean error message for common issues.
|
|
user_facing_errors = [
|
|
"BOT_DETECTED", "BOT_DETECTION_SIGN_IN_REQUIRED",
|
|
"VIDEO_UNAVAILABLE", "PRIVATE_VIDEO", "VIDEO_REMOVED",
|
|
"AGE_GATED_SIGN_IN", "MEMBERS_ONLY", "VIDEO_PROCESSING", "GEO_RESTRICTED"
|
|
]
|
|
is_user_facing_error = hasattr(e, 'errorCode') and e.errorCode in user_facing_errors
|
|
|
|
# Only show full traceback in verbose mode AND if it's NOT a common user-facing error.
|
|
show_exc_info = args.verbose and not is_user_facing_error
|
|
|
|
logger.error(f"A Thrift error occurred: {message}", exc_info=show_exc_info)
|
|
print(f"\n--- ERROR ---", file=sys.stderr)
|
|
print(f"{message}", file=sys.stderr)
|
|
|
|
if hasattr(e, 'context') and e.context and (args.verbose or not is_user_facing_error):
|
|
print(f"\n--- CONTEXT ---", file=sys.stderr)
|
|
# The context is a dict from thrift. Pretty print it, handling newlines in values.
|
|
if isinstance(e.context, dict):
|
|
# Process each value to un-escape newlines for clean printing
|
|
processed_context = {}
|
|
for key, value in e.context.items():
|
|
try:
|
|
processed_context[key] = codecs.decode(str(value), 'unicode_escape')
|
|
except Exception:
|
|
processed_context[key] = str(value).replace('\\n', '\n')
|
|
print(json.dumps(processed_context, indent=2), file=sys.stderr)
|
|
else:
|
|
# Fallback for non-dict context
|
|
print(str(e.context), file=sys.stderr)
|
|
print("\n", file=sys.stderr)
|
|
return 1
|
|
except TTransport.TTransportException as e:
|
|
logger.error(f"Connection to server failed: {e}", exc_info=args.verbose)
|
|
print(f"Error: Connection to server at {args.host}:{args.port} failed.", file=sys.stderr)
|
|
return 1
|
|
except Exception as e:
|
|
logger.exception(f"An unexpected error occurred: {e}")
|
|
print(f"An unexpected error occurred: {e}", file=sys.stderr)
|
|
return 1
|
|
finally:
|
|
if transport and transport.isOpen():
|
|
transport.close()
|
|
logger.info("Thrift connection closed.")
|