Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 75 additions & 33 deletions transferwee.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
#!/usr/bin/env python3

#
# Copyright (c) 2018-2023 Leonardo Taccari
#!/usr/bin/env python3 # # Copyright (c) 2018-2023 Leonardo Taccari
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -37,7 +34,9 @@
files from a `we.tl' or `wetransfer.com/downloads' URLs and upload files that
will be shared via emails or link.
"""

import random
import string
import uuid
from typing import Any, Dict, List, Optional, Union
import binascii
import functools
Expand All @@ -54,9 +53,8 @@

WETRANSFER_API_URL = "https://wetransfer.com/api/v4/transfers"
WETRANSFER_DOWNLOAD_URL = WETRANSFER_API_URL + "/{transfer_id}/download"
WETRANSFER_UPLOAD_EMAIL_URL = WETRANSFER_API_URL + "/email"
WETRANSFER_UPLOAD_URL = WETRANSFER_API_URL + "/{transfer_id}/passwordless"
WETRANSFER_VERIFY_URL = WETRANSFER_API_URL + "/{transfer_id}/verify"
WETRANSFER_UPLOAD_LINK_URL = WETRANSFER_API_URL + "/link"
WETRANSFER_FINALIZE_URL = WETRANSFER_API_URL + "/{transfer_id}/finalize"

WETRANSFER_EXPIRE_IN = 604800
Expand Down Expand Up @@ -178,10 +176,6 @@ def _file_name_and_size(file: str) -> Dict[str, Union[int, str]]:

def _prepare_session() -> Optional[requests.Session]:
"""Prepare a wetransfer.com session.

Return a requests session that will always pass the required headers
and with cookies properly populated that can be used for wetransfer
requests.
"""
s = requests.Session()
s.headers.update(
Expand All @@ -190,14 +184,6 @@ def _prepare_session() -> Optional[requests.Session]:
"x-requested-with": "XMLHttpRequest",
}
)
r = s.get("https://wetransfer.com/")
m = re.search('name="csrf-token" content="([^"]+)"', r.text)
if m:
logger.debug(f"Setting x-csrf-token header to {m.group(1)}")
s.headers.update({"x-csrf-token": m.group(1)})
else:
logger.debug("Could not find any csrf-token")

return s


Expand All @@ -208,6 +194,8 @@ def _close_session(s: requests.Session) -> None:
"""
s.close()

def generate_random_uuid():
return str(uuid.uuid4())

def _prepare_email_upload(
filenames: List[str],
Expand All @@ -222,37 +210,89 @@ def _prepare_email_upload(

Return the parsed JSON response.
"""
lsid = generate_random_uuid()

j = {
"downloader_email_verification": "anonymous", # magic parameter!
"files": [_file_name_and_size(f) for f in filenames],
"from": sender,
"lsid": lsid,
"display_name": display_name,
"message": message,
"recipients": recipients,
"ui_language": "en",
}
r = session.post(WETRANSFER_API_URL, json=j)
r.raise_for_status()

r = session.post(WETRANSFER_UPLOAD_EMAIL_URL, json=j)
return r.json()
return {"sender": sender, "id": r.json()["id"], "lsid": lsid}


def generate_client_id():
"""
Generates a random client_id in the same format as the example:
32 characters consisting of uppercase letters, lowercase letters, and digits.
"""
characters = string.ascii_letters + string.digits
return ''.join(random.choice(characters) for _ in range(32))


def _verify_email_upload(
transfer_id: str, session: requests.Session
) -> Dict[Any, Any]:
"""Given a transfer_id, read the code from standard input.
transfer_data: dict, session: requests.Session
):
"""Given a transfer_data, read the code from standard input.

Return the parsed JSON response.

"""
code = input("Code:")
logger.debug("send code to sender email")
c_id = generate_client_id()
data = {
"client_id": c_id,
"email": transfer_data["sender"]
}
url = "https://wetransfer.com/adroit/api/v1/login/passwordless"
response = requests.post(url, json=data)
response.raise_for_status()

j = {
"code": code,
"expire_in": WETRANSFER_EXPIRE_IN,
code = input("Confirmation code sent to your email:")

headers = {
'user-agent': WETRANSFER_USER_AGENT,
'x-csrf-token': 'csrf-token'
}

r = session.post(
WETRANSFER_VERIFY_URL.format(transfer_id=transfer_id), json=j
logger.debug("get access_token")
data = {
"grant_type": "http://auth0.com/oauth/grant-type/passwordless/otp",
"client_id": c_id,
"otp": code,
"realm": "email",
"username": transfer_data["sender"],
}
url = "https://auth.wetransfer.com/oauth/token"
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
tk = response.json()["access_token"]

logger.debug("Comfirm upload")
headers = {
"authorization": f"Bearer {tk}",
"user-agent": WETRANSFER_USER_AGENT
}
payload = {
"expire_in": 259200,
"lsid": transfer_data["lsid"],
"segment_id": "grace_period"
}
response = requests.post(
WETRANSFER_UPLOAD_URL.format(transfer_id=transfer_data["id"]),
headers=headers,
json=payload
)
return r.json()
response.raise_for_status()

return response.json()


def _prepare_link_upload(
Expand All @@ -266,13 +306,15 @@ def _prepare_link_upload(
Return the parsed JSON response.
"""
j = {
"anonymous_transfer": True, # magic parameter!
"files": [_file_name_and_size(f) for f in filenames],
"display_name": display_name,
"message": message,
"ui_language": "en",
}

r = session.post(WETRANSFER_UPLOAD_LINK_URL, json=j)
r = session.post(WETRANSFER_API_URL, json=j)
r.raise_for_status()
return r.json()


Expand Down Expand Up @@ -556,7 +598,7 @@ def upload(
transfer = _prepare_email_upload(
files, display_name, message, sender, recipients, s
)
transfer = _verify_email_upload(transfer["id"], s)
transfer = _verify_email_upload(transfer, s)
else:
# link upload
transfer = _prepare_link_upload(files, display_name, message, s)
Expand Down