diff --git a/transferwee.py b/transferwee.py index 8d7eb55..dd3806d 100755 --- a/transferwee.py +++ b/transferwee.py @@ -1,7 +1,4 @@ -#!/usr/bin/env python3 - -# -# Copyright (c) 2018-2023 Leonardo Taccari +#!/usr/bin/env python3 # # Copyright (c) 2018-2023 Leonardo Taccari # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -37,7 +34,9 @@ files from a `we.tl' or `wetransfer.com/downloads' URLs and upload files that will be shared via emails or link. """ - +import random +import string +import uuid from typing import Any, Dict, List, Optional, Union import binascii import functools @@ -54,9 +53,8 @@ WETRANSFER_API_URL = "https://wetransfer.com/api/v4/transfers" WETRANSFER_DOWNLOAD_URL = WETRANSFER_API_URL + "/{transfer_id}/download" -WETRANSFER_UPLOAD_EMAIL_URL = WETRANSFER_API_URL + "/email" +WETRANSFER_UPLOAD_URL = WETRANSFER_API_URL + "/{transfer_id}/passwordless" WETRANSFER_VERIFY_URL = WETRANSFER_API_URL + "/{transfer_id}/verify" -WETRANSFER_UPLOAD_LINK_URL = WETRANSFER_API_URL + "/link" WETRANSFER_FINALIZE_URL = WETRANSFER_API_URL + "/{transfer_id}/finalize" WETRANSFER_EXPIRE_IN = 604800 @@ -178,10 +176,6 @@ def _file_name_and_size(file: str) -> Dict[str, Union[int, str]]: def _prepare_session() -> Optional[requests.Session]: """Prepare a wetransfer.com session. - - Return a requests session that will always pass the required headers - and with cookies properly populated that can be used for wetransfer - requests. """ s = requests.Session() s.headers.update( @@ -190,14 +184,6 @@ def _prepare_session() -> Optional[requests.Session]: "x-requested-with": "XMLHttpRequest", } ) - r = s.get("https://wetransfer.com/") - m = re.search('name="csrf-token" content="([^"]+)"', r.text) - if m: - logger.debug(f"Setting x-csrf-token header to {m.group(1)}") - s.headers.update({"x-csrf-token": m.group(1)}) - else: - logger.debug("Could not find any csrf-token") - return s @@ -208,6 +194,8 @@ def _close_session(s: requests.Session) -> None: """ s.close() +def generate_random_uuid(): + return str(uuid.uuid4()) def _prepare_email_upload( filenames: List[str], @@ -222,37 +210,89 @@ def _prepare_email_upload( Return the parsed JSON response. """ + lsid = generate_random_uuid() + j = { + "downloader_email_verification": "anonymous", # magic parameter! "files": [_file_name_and_size(f) for f in filenames], "from": sender, + "lsid": lsid, "display_name": display_name, "message": message, "recipients": recipients, "ui_language": "en", } + r = session.post(WETRANSFER_API_URL, json=j) + r.raise_for_status() - r = session.post(WETRANSFER_UPLOAD_EMAIL_URL, json=j) - return r.json() + return {"sender": sender, "id": r.json()["id"], "lsid": lsid} + + +def generate_client_id(): + """ + Generates a random client_id in the same format as the example: + 32 characters consisting of uppercase letters, lowercase letters, and digits. + """ + characters = string.ascii_letters + string.digits + return ''.join(random.choice(characters) for _ in range(32)) def _verify_email_upload( - transfer_id: str, session: requests.Session -) -> Dict[Any, Any]: - """Given a transfer_id, read the code from standard input. + transfer_data: dict, session: requests.Session +): + """Given a transfer_data, read the code from standard input. Return the parsed JSON response. + """ - code = input("Code:") + logger.debug("send code to sender email") + c_id = generate_client_id() + data = { + "client_id": c_id, + "email": transfer_data["sender"] + } + url = "https://wetransfer.com/adroit/api/v1/login/passwordless" + response = requests.post(url, json=data) + response.raise_for_status() - j = { - "code": code, - "expire_in": WETRANSFER_EXPIRE_IN, + code = input("Confirmation code sent to your email:") + + headers = { + 'user-agent': WETRANSFER_USER_AGENT, + 'x-csrf-token': 'csrf-token' } - r = session.post( - WETRANSFER_VERIFY_URL.format(transfer_id=transfer_id), json=j + logger.debug("get access_token") + data = { + "grant_type": "http://auth0.com/oauth/grant-type/passwordless/otp", + "client_id": c_id, + "otp": code, + "realm": "email", + "username": transfer_data["sender"], + } + url = "https://auth.wetransfer.com/oauth/token" + response = requests.post(url, headers=headers, json=data) + response.raise_for_status() + tk = response.json()["access_token"] + + logger.debug("Comfirm upload") + headers = { + "authorization": f"Bearer {tk}", + "user-agent": WETRANSFER_USER_AGENT + } + payload = { + "expire_in": 259200, + "lsid": transfer_data["lsid"], + "segment_id": "grace_period" + } + response = requests.post( + WETRANSFER_UPLOAD_URL.format(transfer_id=transfer_data["id"]), + headers=headers, + json=payload ) - return r.json() + response.raise_for_status() + + return response.json() def _prepare_link_upload( @@ -266,13 +306,15 @@ def _prepare_link_upload( Return the parsed JSON response. """ j = { + "anonymous_transfer": True, # magic parameter! "files": [_file_name_and_size(f) for f in filenames], "display_name": display_name, "message": message, "ui_language": "en", } - r = session.post(WETRANSFER_UPLOAD_LINK_URL, json=j) + r = session.post(WETRANSFER_API_URL, json=j) + r.raise_for_status() return r.json() @@ -556,7 +598,7 @@ def upload( transfer = _prepare_email_upload( files, display_name, message, sender, recipients, s ) - transfer = _verify_email_upload(transfer["id"], s) + transfer = _verify_email_upload(transfer, s) else: # link upload transfer = _prepare_link_upload(files, display_name, message, s)