Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions desktop/core/src/desktop/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,12 @@
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'LOCATION': CACHES_HIVE_DISCOVERY_KEY
}
CACHES_WEBHDFS_DELEGATION_TOKEN_KEY = 'webhdfs_delegation_token'
CACHES[CACHES_WEBHDFS_DELEGATION_TOKEN_KEY] = {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'LOCATION': CACHES_WEBHDFS_DELEGATION_TOKEN_KEY,
'TIMEOUT': desktop.conf.KERBEROS.REINIT_FREQUENCY
}

CACHES_CELERY_KEY = 'celery'
CACHES_CELERY_QUERY_RESULT_KEY = 'celery_query_results'
Expand Down
52 changes: 28 additions & 24 deletions desktop/libs/hadoop/src/hadoop/fs/webhdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
Interfaces for Hadoop filesystem access via HttpFs/WebHDFS
"""

from future import standard_library
standard_library.install_aliases()
from builtins import oct
from builtins import object
from datetime import datetime, timedelta
import stat
import time
import errno
Expand All @@ -30,15 +35,20 @@
from builtins import object, oct
from urllib.parse import unquote as urllib_unquote, urlparse

from django.core.cache import caches
from django.http.multipartparser import MultiPartParser
from django.utils.encoding import smart_str
from django.utils.translation import gettext as _
from past.builtins import long

import hadoop.conf

import desktop.conf
from desktop.lib.rest import http_client, resource
from hadoop.fs import SEEK_CUR, SEEK_END, SEEK_SET, normpath as fs_normpath
from desktop.settings import CACHES_WEBHDFS_DELEGATION_TOKEN_KEY
from past.builtins import long
from hadoop.fs import normpath as fs_normpath, SEEK_SET, SEEK_CUR, SEEK_END
from hadoop.fs.hadoopfs import Hdfs
from hadoop.fs.exceptions import WebHdfsException
from hadoop.fs.hadoopfs import Hdfs
from hadoop.fs.webhdfs_types import WebHdfsContentSummary, WebHdfsStat
Expand All @@ -51,6 +61,8 @@

LOG = logging.getLogger()

cache = caches[CACHES_WEBHDFS_DELEGATION_TOKEN_KEY]


class WebHdfs(Hdfs):
"""
Expand Down Expand Up @@ -202,11 +214,26 @@ def current_trash_path(self, trash_path):
return self.join(trash_path, self.TRASH_CURRENT)

def _getparams(self):
if self._security_enabled:
token = cache.get(self.user, None)
if not token:
token = self.get_delegation_token(self.user)
cache.set(self.user, token)
return {'delegation': token}
return {
"user.name": WebHdfs.DEFAULT_USER,
"doas": self.user
}

def get_delegation_token(self, user):
params = {}
params['op'] = 'GETDELEGATIONTOKEN'
params['doas'] = user
params['renewer'] = user
headers = self._getheaders()
res = self._root.get(params=params, headers=headers)
return res['Token'] and res['Token']['urlString']

def _getheaders(self):
return None

Expand Down Expand Up @@ -547,16 +574,6 @@ def read_url(self, path, offset=0, length=None, bufsize=None):
params['length'] = long(length)
if bufsize is not None:
params['bufsize'] = bufsize
if self._security_enabled:
token = self.get_delegation_token(self.user)
if token:
params['delegation'] = token
# doas should not be present with delegation token as the token includes the username
# https://hadoop.apache.org/docs/r1.0.4/webhdfs.html
if 'doas' in params:
del params['doas']
if 'user.name' in params:
del params['user.name']
unquoted_path = urllib_unquote(smart_str(path))
return self._client._make_url(unquoted_path, params)

Expand Down Expand Up @@ -864,19 +881,6 @@ def _get_redirect_url(self, webhdfs_ex):
LOG.exception("Failed to read redirect from response: %s (%s)" % (webhdfs_ex, ex))
raise webhdfs_ex

def get_delegation_token(self, renewer):
"""get_delegation_token(user) -> Delegation token"""
# Workaround for HDFS-3988
if self._security_enabled:
self.get_home_dir()

params = self._getparams()
params['op'] = 'GETDELEGATIONTOKEN'
params['renewer'] = renewer
headers = self._getheaders()
res = self._root.get(params=params, headers=headers)
return res['Token'] and res['Token']['urlString']

def do_as_user(self, username, fn, *args, **kwargs):
prev_user = self.user
try:
Expand Down