Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions twitter_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1163,23 +1163,37 @@ def operation(client: TwitterClient) -> WritePayload:
@click.argument("text")
@click.option("--reply-to", "-r", default=None, help="Reply to this tweet ID.")
@click.option("--image", "-i", "images", multiple=True, type=click.Path(exists=True), help="Attach image (up to 4). Repeatable.")
@click.option("--video", "-V", "video", default=None, type=click.Path(exists=True), help="Attach a single video (mp4/mov, up to 512 MB). Mutually exclusive with --image.")
@structured_output_options
def post(text, reply_to, images, as_json, as_yaml):
# type: (str, Optional[str], tuple, bool, bool) -> None
def post(text, reply_to, images, video, as_json, as_yaml):
# type: (str, Optional[str], tuple, Optional[str], bool, bool) -> None
"""Post a new tweet. TEXT is the tweet content.

Attach images with --image / -i (up to 4):

\b
twitter post "Hello!" --image photo.jpg
twitter post "Gallery" -i a.png -i b.png -i c.jpg

Or attach a single video with --video / -V (mutually exclusive with --image):

\b
twitter post "Demo" --video clip.mp4
"""
if video and images:
raise click.UsageError("--video and --image are mutually exclusive (X allows either up to 4 images, or 1 video).")

normalized_reply_to = _normalize_tweet_id(reply_to) if reply_to else None
action = "Replying to %s" % normalized_reply_to if normalized_reply_to else "Posting tweet"
rich_output = not _structured_mode(as_json=as_json, as_yaml=as_yaml)

def operation(client: TwitterClient) -> WritePayload:
media_ids = _upload_images(client, images, rich_output=rich_output)
if video:
if rich_output:
console.print("🎬 Uploading video (this may take a while for transcode)...")
media_ids = [client.upload_media(video)]
else:
media_ids = _upload_images(client, images, rich_output=rich_output)
tweet_id = client.create_tweet(text, reply_to_id=normalized_reply_to, media_ids=media_ids or None)
return {"success": True, "action": "post", "id": tweet_id, "url": "https://x.com/i/status/%s" % tweet_id}

Expand Down
202 changes: 169 additions & 33 deletions twitter_cli/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,10 @@ def fetch_following(self, user_id, count=20):
# Supported image MIME types and max file size (5 MB)
_SUPPORTED_IMAGE_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"}
_MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5 MB
_SUPPORTED_VIDEO_TYPES = {"video/mp4", "video/quicktime"}
_MAX_VIDEO_SIZE = 512 * 1024 * 1024 # 512 MB (X Premium upper limit)
_VIDEO_CHUNK_SIZE = 4 * 1024 * 1024 # 4 MB per APPEND segment
_VIDEO_STATUS_TIMEOUT = 300 # 5 minutes total wait for transcode

def _write_delay(self):
# type: () -> None
Expand All @@ -483,25 +487,32 @@ def _write_delay(self):

def upload_media(self, file_path):
# type: (str) -> str
"""Upload an image file to Twitter. Returns the media_id string.
"""Upload an image or video file to Twitter. Returns the media_id string.

Uses Twitter's chunked upload API (INIT → APPEND → FINALIZE).
Supports JPEG, PNG, GIF, and WebP images up to 5 MB.
Uses Twitter's chunked upload API (INIT → APPEND → FINALIZE [→ STATUS]).
Images: JPEG / PNG / GIF / WebP up to 5 MB (single APPEND segment).
Video : MP4 / MOV up to 512 MB (multi-segment APPEND + STATUS polling
until server-side transcode succeeds).
"""
if not os.path.isfile(file_path):
raise MediaUploadError("File not found: %s" % file_path)

file_size = os.path.getsize(file_path)
if file_size > self._MAX_IMAGE_SIZE:
media_type = mimetypes.guess_type(file_path)[0] or ""
is_video = media_type in self._SUPPORTED_VIDEO_TYPES
is_image = media_type in self._SUPPORTED_IMAGE_TYPES

if not (is_video or is_image):
raise MediaUploadError(
"File too large: %.1f MB (max %.0f MB)"
% (file_size / (1024 * 1024), self._MAX_IMAGE_SIZE / (1024 * 1024))
"Unsupported media format: %s (supported: jpeg, png, gif, webp, mp4, mov)"
% media_type
)

media_type = mimetypes.guess_type(file_path)[0] or ""
if media_type not in self._SUPPORTED_IMAGE_TYPES:
max_size = self._MAX_VIDEO_SIZE if is_video else self._MAX_IMAGE_SIZE
if file_size > max_size:
raise MediaUploadError(
"Unsupported image format: %s (supported: jpeg, png, gif, webp)" % media_type
"File too large: %.1f MB (max %.0f MB)"
% (file_size / (1024 * 1024), max_size / (1024 * 1024))
)

upload_url = "https://upload.twitter.com/i/media/upload.json"
Expand All @@ -515,6 +526,8 @@ def upload_media(self, file_path):
"total_bytes": str(file_size),
"media_type": media_type,
}
if is_video:
init_data["media_category"] = "tweet_video"
resp = session.post(upload_url, headers=headers, data=init_data, timeout=30)
if resp.status_code >= 400:
raise MediaUploadError("INIT failed (HTTP %d): %s" % (resp.status_code, resp.text[:300]))
Expand All @@ -525,25 +538,53 @@ def upload_media(self, file_path):
media_id = init_result.get("media_id_string", "")
if not media_id:
raise MediaUploadError("INIT did not return media_id")
logger.info("Media INIT: media_id=%s", media_id)
logger.info("Media INIT: media_id=%s (kind=%s)", media_id, "video" if is_video else "image")

# ── APPEND ───────────────────────────────────────────────────
with open(file_path, "rb") as f:
media_data = base64.b64encode(f.read()).decode("ascii")

headers = self._build_headers(url=upload_url, method="POST")
# Remove JSON content-type — curl_cffi handles multipart encoding
headers.pop("Content-Type", None)
append_data = {
"command": "APPEND",
"media_id": media_id,
"segment_index": "0",
"media_data": media_data,
}
resp = session.post(upload_url, headers=headers, data=append_data, timeout=60)
if resp.status_code >= 400:
raise MediaUploadError("APPEND failed (HTTP %d): %s" % (resp.status_code, resp.text[:300]))
logger.info("Media APPEND: segment 0 uploaded")
if is_video:
# 视频分多段(4 MB / segment),避免大块 base64 撑爆请求
with open(file_path, "rb") as f:
segment_index = 0
while True:
chunk = f.read(self._VIDEO_CHUNK_SIZE)
if not chunk:
break
media_data = base64.b64encode(chunk).decode("ascii")
headers = self._build_headers(url=upload_url, method="POST")
headers.pop("Content-Type", None)
append_data = {
"command": "APPEND",
"media_id": media_id,
"segment_index": str(segment_index),
"media_data": media_data,
}
resp = session.post(upload_url, headers=headers, data=append_data, timeout=180)
Comment on lines +552 to +561
if resp.status_code >= 400:
raise MediaUploadError(
"APPEND segment %d failed (HTTP %d): %s"
% (segment_index, resp.status_code, resp.text[:300])
)
logger.info(
"Media APPEND: segment %d uploaded (%d bytes)",
segment_index, len(chunk),
)
segment_index += 1
else:
# 图片单段
with open(file_path, "rb") as f:
media_data = base64.b64encode(f.read()).decode("ascii")
headers = self._build_headers(url=upload_url, method="POST")
headers.pop("Content-Type", None)
append_data = {
"command": "APPEND",
"media_id": media_id,
"segment_index": "0",
"media_data": media_data,
}
resp = session.post(upload_url, headers=headers, data=append_data, timeout=60)
if resp.status_code >= 400:
raise MediaUploadError("APPEND failed (HTTP %d): %s" % (resp.status_code, resp.text[:300]))
logger.info("Media APPEND: segment 0 uploaded")

# ── FINALIZE ─────────────────────────────────────────────────
headers = self._build_headers(url=upload_url, method="POST")
Expand All @@ -555,14 +596,71 @@ def upload_media(self, file_path):
resp = session.post(upload_url, headers=headers, data=finalize_data, timeout=30)
if resp.status_code >= 400:
raise MediaUploadError("FINALIZE failed (HTTP %d): %s" % (resp.status_code, resp.text[:300]))
logger.info("Media FINALIZE: media_id=%s ready", media_id)
try:
finalize_result = json.loads(resp.text)
except (json.JSONDecodeError, ValueError):
finalize_result = {}
logger.info("Media FINALIZE: media_id=%s", media_id)

# ── STATUS(视频独有)─────────────────────────────────────────
if is_video:
processing_info = finalize_result.get("processing_info", {}) or {}
state = processing_info.get("state", "succeeded")
elapsed = 0
while state in ("pending", "in_progress"):
check_after = int(processing_info.get("check_after_secs", 5) or 5)
time.sleep(check_after)
elapsed += check_after
if elapsed > self._VIDEO_STATUS_TIMEOUT:
raise MediaUploadError("Video transcode timeout after %ds" % elapsed)
status_url = (
"%s?command=STATUS&media_id=%s" % (upload_url, media_id)
)
headers = self._build_headers(url=status_url, method="GET")
resp = session.get(status_url, headers=headers, timeout=30)
if resp.status_code >= 400:
raise MediaUploadError(
"STATUS failed (HTTP %d): %s" % (resp.status_code, resp.text[:300])
)
try:
status_result = json.loads(resp.text)
except (json.JSONDecodeError, ValueError):
raise MediaUploadError("STATUS returned invalid JSON")
processing_info = status_result.get("processing_info", {}) or {}
state = processing_info.get("state", "succeeded")
logger.info("Media STATUS: state=%s (elapsed=%ds)", state, elapsed)
if state == "failed":
err = processing_info.get("error", {})
raise MediaUploadError(
"Video transcode failed: name=%s message=%s"
% (err.get("name", ""), err.get("message", ""))
)

logger.info("Media ready: media_id=%s", media_id)
return media_id

@staticmethod
def _weighted_length(text):
# type: (str) -> int
"""Return X's weighted character count for a tweet.

X counts basic ASCII characters as 1 and most other characters (CJK,
emoji, etc.) as 2. The standard tweet limit is 280 weighted units;
anything above must go through the CreateNoteTweet endpoint (X
Premium long-form post).
"""
n = 0
for ch in text:
n += 1 if ord(ch) < 0x80 else 2
return n
Comment on lines +642 to +655

def create_tweet(self, text, reply_to_id=None, media_ids=None):
# type: (str, Optional[str], Optional[List[str]]) -> str
"""Post a new tweet. Returns the new tweet ID.

Automatically routes to the CreateNoteTweet GraphQL endpoint when the
weighted character count exceeds 280 (X Premium long-form post).

Args:
text: Tweet text content.
reply_to_id: Optional tweet ID to reply to.
Expand All @@ -582,12 +680,32 @@ def create_tweet(self, text, reply_to_id=None, media_ids=None):
"in_reply_to_tweet_id": reply_to_id,
"exclude_reply_user_ids": [],
}
data = self._graphql_post("CreateTweet", variables, FEATURES)

# Route to long-form endpoint if the tweet exceeds the standard limit.
weighted = self._weighted_length(text)
if weighted > 280:
op_name = "CreateNoteTweet"
# CreateNoteTweet requires this field; without it X returns
# HTTP 200 with an empty tweet_results object (silent failure).
variables["disallowed_reply_options"] = None
logger.info(
"Tweet weighted=%d > 280, using CreateNoteTweet (long-form)",
weighted,
)
else:
op_name = "CreateTweet"

data = self._graphql_post(op_name, variables, FEATURES)
self._write_delay()
result = _deep_get(data, "data", "create_tweet", "tweet_results", "result")
if op_name == "CreateNoteTweet":
result = _deep_get(
data, "data", "notetweet_create", "tweet_results", "result"
)
else:
result = _deep_get(data, "data", "create_tweet", "tweet_results", "result")
if result:
return result.get("rest_id", "")
raise TwitterAPIError(0, "Failed to create tweet")
raise TwitterAPIError(0, "Failed to create tweet (op=%s)" % op_name)

def delete_tweet(self, tweet_id):
# type: (str) -> bool
Expand Down Expand Up @@ -697,6 +815,9 @@ def quote_tweet(self, tweet_id, text, media_ids=None):
# type: (str, str, Optional[List[str]]) -> str
"""Quote-tweet a tweet. Returns the new tweet ID.

Automatically routes to the CreateNoteTweet endpoint when the
weighted character count exceeds 280 (X Premium long-form quote).

Args:
tweet_id: The tweet ID to quote.
text: Commentary text.
Expand All @@ -712,12 +833,27 @@ def quote_tweet(self, tweet_id, text, media_ids=None):
"semantic_annotation_ids": [],
"dark_request": False,
}
data = self._graphql_post("CreateTweet", variables, FEATURES)
weighted = self._weighted_length(text)
if weighted > 280:
op_name = "CreateNoteTweet"
variables["disallowed_reply_options"] = None
logger.info(
"Quote weighted=%d > 280, using CreateNoteTweet (long-form quote)",
weighted,
)
else:
op_name = "CreateTweet"
data = self._graphql_post(op_name, variables, FEATURES)
self._write_delay()
result = _deep_get(data, "data", "create_tweet", "tweet_results", "result")
if op_name == "CreateNoteTweet":
result = _deep_get(
data, "data", "notetweet_create", "tweet_results", "result"
)
else:
result = _deep_get(data, "data", "create_tweet", "tweet_results", "result")
if result:
return result.get("rest_id", "")
raise TwitterAPIError(0, "Failed to create quote tweet")
raise TwitterAPIError(0, "Failed to create quote tweet (op=%s)" % op_name)

def follow_user(self, user_id):
# type: (str) -> bool
Expand Down