diff --git a/twitter_cli/cli.py b/twitter_cli/cli.py index d2dc523..ddfcabf 100644 --- a/twitter_cli/cli.py +++ b/twitter_cli/cli.py @@ -1163,9 +1163,10 @@ def operation(client: TwitterClient) -> WritePayload: @click.argument("text") @click.option("--reply-to", "-r", default=None, help="Reply to this tweet ID.") @click.option("--image", "-i", "images", multiple=True, type=click.Path(exists=True), help="Attach image (up to 4). Repeatable.") +@click.option("--video", "-V", "video", default=None, type=click.Path(exists=True), help="Attach a single video (mp4/mov, up to 512 MB). Mutually exclusive with --image.") @structured_output_options -def post(text, reply_to, images, as_json, as_yaml): - # type: (str, Optional[str], tuple, bool, bool) -> None +def post(text, reply_to, images, video, as_json, as_yaml): + # type: (str, Optional[str], tuple, Optional[str], bool, bool) -> None """Post a new tweet. TEXT is the tweet content. Attach images with --image / -i (up to 4): @@ -1173,13 +1174,26 @@ def post(text, reply_to, images, as_json, as_yaml): \b twitter post "Hello!" --image photo.jpg twitter post "Gallery" -i a.png -i b.png -i c.jpg + + Or attach a single video with --video / -V (mutually exclusive with --image): + + \b + twitter post "Demo" --video clip.mp4 """ + if video and images: + raise click.UsageError("--video and --image are mutually exclusive (X allows either up to 4 images, or 1 video).") + normalized_reply_to = _normalize_tweet_id(reply_to) if reply_to else None action = "Replying to %s" % normalized_reply_to if normalized_reply_to else "Posting tweet" rich_output = not _structured_mode(as_json=as_json, as_yaml=as_yaml) def operation(client: TwitterClient) -> WritePayload: - media_ids = _upload_images(client, images, rich_output=rich_output) + if video: + if rich_output: + console.print("๐ŸŽฌ Uploading video (this may take a while for transcode)...") + media_ids = [client.upload_media(video)] + else: + media_ids = _upload_images(client, images, rich_output=rich_output) tweet_id = client.create_tweet(text, reply_to_id=normalized_reply_to, media_ids=media_ids or None) return {"success": True, "action": "post", "id": tweet_id, "url": "https://x.com/i/status/%s" % tweet_id} diff --git a/twitter_cli/client.py b/twitter_cli/client.py index 0436c8e..1d830b7 100644 --- a/twitter_cli/client.py +++ b/twitter_cli/client.py @@ -473,6 +473,10 @@ def fetch_following(self, user_id, count=20): # Supported image MIME types and max file size (5 MB) _SUPPORTED_IMAGE_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"} _MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5 MB + _SUPPORTED_VIDEO_TYPES = {"video/mp4", "video/quicktime"} + _MAX_VIDEO_SIZE = 512 * 1024 * 1024 # 512 MB (X Premium upper limit) + _VIDEO_CHUNK_SIZE = 4 * 1024 * 1024 # 4 MB per APPEND segment + _VIDEO_STATUS_TIMEOUT = 300 # 5 minutes total wait for transcode def _write_delay(self): # type: () -> None @@ -483,25 +487,32 @@ def _write_delay(self): def upload_media(self, file_path): # type: (str) -> str - """Upload an image file to Twitter. Returns the media_id string. + """Upload an image or video file to Twitter. Returns the media_id string. - Uses Twitter's chunked upload API (INIT โ†’ APPEND โ†’ FINALIZE). - Supports JPEG, PNG, GIF, and WebP images up to 5 MB. + Uses Twitter's chunked upload API (INIT โ†’ APPEND โ†’ FINALIZE [โ†’ STATUS]). + Images: JPEG / PNG / GIF / WebP up to 5 MB (single APPEND segment). + Video : MP4 / MOV up to 512 MB (multi-segment APPEND + STATUS polling + until server-side transcode succeeds). """ if not os.path.isfile(file_path): raise MediaUploadError("File not found: %s" % file_path) file_size = os.path.getsize(file_path) - if file_size > self._MAX_IMAGE_SIZE: + media_type = mimetypes.guess_type(file_path)[0] or "" + is_video = media_type in self._SUPPORTED_VIDEO_TYPES + is_image = media_type in self._SUPPORTED_IMAGE_TYPES + + if not (is_video or is_image): raise MediaUploadError( - "File too large: %.1f MB (max %.0f MB)" - % (file_size / (1024 * 1024), self._MAX_IMAGE_SIZE / (1024 * 1024)) + "Unsupported media format: %s (supported: jpeg, png, gif, webp, mp4, mov)" + % media_type ) - media_type = mimetypes.guess_type(file_path)[0] or "" - if media_type not in self._SUPPORTED_IMAGE_TYPES: + max_size = self._MAX_VIDEO_SIZE if is_video else self._MAX_IMAGE_SIZE + if file_size > max_size: raise MediaUploadError( - "Unsupported image format: %s (supported: jpeg, png, gif, webp)" % media_type + "File too large: %.1f MB (max %.0f MB)" + % (file_size / (1024 * 1024), max_size / (1024 * 1024)) ) upload_url = "https://upload.twitter.com/i/media/upload.json" @@ -515,6 +526,8 @@ def upload_media(self, file_path): "total_bytes": str(file_size), "media_type": media_type, } + if is_video: + init_data["media_category"] = "tweet_video" resp = session.post(upload_url, headers=headers, data=init_data, timeout=30) if resp.status_code >= 400: raise MediaUploadError("INIT failed (HTTP %d): %s" % (resp.status_code, resp.text[:300])) @@ -525,25 +538,53 @@ def upload_media(self, file_path): media_id = init_result.get("media_id_string", "") if not media_id: raise MediaUploadError("INIT did not return media_id") - logger.info("Media INIT: media_id=%s", media_id) + logger.info("Media INIT: media_id=%s (kind=%s)", media_id, "video" if is_video else "image") # โ”€โ”€ APPEND โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ - with open(file_path, "rb") as f: - media_data = base64.b64encode(f.read()).decode("ascii") - - headers = self._build_headers(url=upload_url, method="POST") - # Remove JSON content-type โ€” curl_cffi handles multipart encoding - headers.pop("Content-Type", None) - append_data = { - "command": "APPEND", - "media_id": media_id, - "segment_index": "0", - "media_data": media_data, - } - resp = session.post(upload_url, headers=headers, data=append_data, timeout=60) - if resp.status_code >= 400: - raise MediaUploadError("APPEND failed (HTTP %d): %s" % (resp.status_code, resp.text[:300])) - logger.info("Media APPEND: segment 0 uploaded") + if is_video: + # ่ง†้ข‘ๅˆ†ๅคšๆฎต๏ผˆ4 MB / segment๏ผ‰๏ผŒ้ฟๅ…ๅคงๅ— base64 ๆ’‘็ˆ†่ฏทๆฑ‚ + with open(file_path, "rb") as f: + segment_index = 0 + while True: + chunk = f.read(self._VIDEO_CHUNK_SIZE) + if not chunk: + break + media_data = base64.b64encode(chunk).decode("ascii") + headers = self._build_headers(url=upload_url, method="POST") + headers.pop("Content-Type", None) + append_data = { + "command": "APPEND", + "media_id": media_id, + "segment_index": str(segment_index), + "media_data": media_data, + } + resp = session.post(upload_url, headers=headers, data=append_data, timeout=180) + if resp.status_code >= 400: + raise MediaUploadError( + "APPEND segment %d failed (HTTP %d): %s" + % (segment_index, resp.status_code, resp.text[:300]) + ) + logger.info( + "Media APPEND: segment %d uploaded (%d bytes)", + segment_index, len(chunk), + ) + segment_index += 1 + else: + # ๅ›พ็‰‡ๅ•ๆฎต + with open(file_path, "rb") as f: + media_data = base64.b64encode(f.read()).decode("ascii") + headers = self._build_headers(url=upload_url, method="POST") + headers.pop("Content-Type", None) + append_data = { + "command": "APPEND", + "media_id": media_id, + "segment_index": "0", + "media_data": media_data, + } + resp = session.post(upload_url, headers=headers, data=append_data, timeout=60) + if resp.status_code >= 400: + raise MediaUploadError("APPEND failed (HTTP %d): %s" % (resp.status_code, resp.text[:300])) + logger.info("Media APPEND: segment 0 uploaded") # โ”€โ”€ FINALIZE โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ headers = self._build_headers(url=upload_url, method="POST") @@ -555,14 +596,71 @@ def upload_media(self, file_path): resp = session.post(upload_url, headers=headers, data=finalize_data, timeout=30) if resp.status_code >= 400: raise MediaUploadError("FINALIZE failed (HTTP %d): %s" % (resp.status_code, resp.text[:300])) - logger.info("Media FINALIZE: media_id=%s ready", media_id) + try: + finalize_result = json.loads(resp.text) + except (json.JSONDecodeError, ValueError): + finalize_result = {} + logger.info("Media FINALIZE: media_id=%s", media_id) + + # โ”€โ”€ STATUS๏ผˆ่ง†้ข‘็‹ฌๆœ‰๏ผ‰โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + if is_video: + processing_info = finalize_result.get("processing_info", {}) or {} + state = processing_info.get("state", "succeeded") + elapsed = 0 + while state in ("pending", "in_progress"): + check_after = int(processing_info.get("check_after_secs", 5) or 5) + time.sleep(check_after) + elapsed += check_after + if elapsed > self._VIDEO_STATUS_TIMEOUT: + raise MediaUploadError("Video transcode timeout after %ds" % elapsed) + status_url = ( + "%s?command=STATUS&media_id=%s" % (upload_url, media_id) + ) + headers = self._build_headers(url=status_url, method="GET") + resp = session.get(status_url, headers=headers, timeout=30) + if resp.status_code >= 400: + raise MediaUploadError( + "STATUS failed (HTTP %d): %s" % (resp.status_code, resp.text[:300]) + ) + try: + status_result = json.loads(resp.text) + except (json.JSONDecodeError, ValueError): + raise MediaUploadError("STATUS returned invalid JSON") + processing_info = status_result.get("processing_info", {}) or {} + state = processing_info.get("state", "succeeded") + logger.info("Media STATUS: state=%s (elapsed=%ds)", state, elapsed) + if state == "failed": + err = processing_info.get("error", {}) + raise MediaUploadError( + "Video transcode failed: name=%s message=%s" + % (err.get("name", ""), err.get("message", "")) + ) + logger.info("Media ready: media_id=%s", media_id) return media_id + @staticmethod + def _weighted_length(text): + # type: (str) -> int + """Return X's weighted character count for a tweet. + + X counts basic ASCII characters as 1 and most other characters (CJK, + emoji, etc.) as 2. The standard tweet limit is 280 weighted units; + anything above must go through the CreateNoteTweet endpoint (X + Premium long-form post). + """ + n = 0 + for ch in text: + n += 1 if ord(ch) < 0x80 else 2 + return n + def create_tweet(self, text, reply_to_id=None, media_ids=None): # type: (str, Optional[str], Optional[List[str]]) -> str """Post a new tweet. Returns the new tweet ID. + Automatically routes to the CreateNoteTweet GraphQL endpoint when the + weighted character count exceeds 280 (X Premium long-form post). + Args: text: Tweet text content. reply_to_id: Optional tweet ID to reply to. @@ -582,12 +680,32 @@ def create_tweet(self, text, reply_to_id=None, media_ids=None): "in_reply_to_tweet_id": reply_to_id, "exclude_reply_user_ids": [], } - data = self._graphql_post("CreateTweet", variables, FEATURES) + + # Route to long-form endpoint if the tweet exceeds the standard limit. + weighted = self._weighted_length(text) + if weighted > 280: + op_name = "CreateNoteTweet" + # CreateNoteTweet requires this field; without it X returns + # HTTP 200 with an empty tweet_results object (silent failure). + variables["disallowed_reply_options"] = None + logger.info( + "Tweet weighted=%d > 280, using CreateNoteTweet (long-form)", + weighted, + ) + else: + op_name = "CreateTweet" + + data = self._graphql_post(op_name, variables, FEATURES) self._write_delay() - result = _deep_get(data, "data", "create_tweet", "tweet_results", "result") + if op_name == "CreateNoteTweet": + result = _deep_get( + data, "data", "notetweet_create", "tweet_results", "result" + ) + else: + result = _deep_get(data, "data", "create_tweet", "tweet_results", "result") if result: return result.get("rest_id", "") - raise TwitterAPIError(0, "Failed to create tweet") + raise TwitterAPIError(0, "Failed to create tweet (op=%s)" % op_name) def delete_tweet(self, tweet_id): # type: (str) -> bool @@ -697,6 +815,9 @@ def quote_tweet(self, tweet_id, text, media_ids=None): # type: (str, str, Optional[List[str]]) -> str """Quote-tweet a tweet. Returns the new tweet ID. + Automatically routes to the CreateNoteTweet endpoint when the + weighted character count exceeds 280 (X Premium long-form quote). + Args: tweet_id: The tweet ID to quote. text: Commentary text. @@ -712,12 +833,27 @@ def quote_tweet(self, tweet_id, text, media_ids=None): "semantic_annotation_ids": [], "dark_request": False, } - data = self._graphql_post("CreateTweet", variables, FEATURES) + weighted = self._weighted_length(text) + if weighted > 280: + op_name = "CreateNoteTweet" + variables["disallowed_reply_options"] = None + logger.info( + "Quote weighted=%d > 280, using CreateNoteTweet (long-form quote)", + weighted, + ) + else: + op_name = "CreateTweet" + data = self._graphql_post(op_name, variables, FEATURES) self._write_delay() - result = _deep_get(data, "data", "create_tweet", "tweet_results", "result") + if op_name == "CreateNoteTweet": + result = _deep_get( + data, "data", "notetweet_create", "tweet_results", "result" + ) + else: + result = _deep_get(data, "data", "create_tweet", "tweet_results", "result") if result: return result.get("rest_id", "") - raise TwitterAPIError(0, "Failed to create quote tweet") + raise TwitterAPIError(0, "Failed to create quote tweet (op=%s)" % op_name) def follow_user(self, user_id): # type: (str) -> bool