-
Notifications
You must be signed in to change notification settings - Fork 64
Reap previously launched logviewer tasks #240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,11 +43,11 @@ | |
| import org.apache.mesos.Protos.Resource; | ||
| import org.apache.mesos.Protos.TaskID; | ||
| import org.apache.mesos.Protos.TaskInfo; | ||
| import org.apache.mesos.Protos.TaskState; | ||
| import org.apache.mesos.Protos.TaskStatus; | ||
| import org.apache.mesos.Protos.Value.Range; | ||
| import org.apache.mesos.Protos.Value.Ranges; | ||
| import org.apache.mesos.Protos.Value.Scalar; | ||
| import org.apache.mesos.Protos.TaskState; | ||
| import org.apache.mesos.SchedulerDriver; | ||
| import org.json.simple.JSONValue; | ||
| import org.slf4j.Logger; | ||
|
|
@@ -60,8 +60,8 @@ | |
| import storm.mesos.resources.ResourceEntry; | ||
| import storm.mesos.resources.ResourceNotAvailableException; | ||
| import storm.mesos.resources.ResourceType; | ||
| import storm.mesos.schedulers.StormSchedulerImpl; | ||
| import storm.mesos.schedulers.IMesosStormScheduler; | ||
| import storm.mesos.schedulers.StormSchedulerImpl; | ||
| import storm.mesos.shims.CommandLineShimFactory; | ||
| import storm.mesos.shims.ICommandLineShim; | ||
| import storm.mesos.shims.LocalStateShim; | ||
|
|
@@ -91,8 +91,8 @@ | |
| import java.util.TimerTask; | ||
|
|
||
| import static storm.mesos.util.PrettyProtobuf.offerIDListToString; | ||
| import static storm.mesos.util.PrettyProtobuf.offerToString; | ||
| import static storm.mesos.util.PrettyProtobuf.offerMapToString; | ||
| import static storm.mesos.util.PrettyProtobuf.offerToString; | ||
| import static storm.mesos.util.PrettyProtobuf.taskInfoListToString; | ||
| import static storm.mesos.util.PrettyProtobuf.taskStatusListToTaskIDsString; | ||
|
|
||
|
|
@@ -222,30 +222,17 @@ void initializeMesosStormConf(Map conf, String localDir) { | |
| _disallowedHosts = listIntoSet((List<String>) conf.get(CONF_MESOS_DISALLOWED_HOSTS)); | ||
| _enabledLogviewerSidecar = MesosCommon.enabledLogviewerSidecar(conf); | ||
|
|
||
| if (_enabledLogviewerSidecar) { | ||
| Set<String> zkServerSet = listIntoSet((List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS)); | ||
| String zkPort = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_PORT)); | ||
| _logviewerZkDir = Optional.fromNullable((String) conf.get(Config.STORM_ZOOKEEPER_ROOT)).or("") + "/storm-mesos/logviewers"; | ||
| LOG.info("Logviewer information will be stored under {}", _logviewerZkDir); | ||
|
|
||
| if (zkPort == null || zkServerSet == null) { | ||
| throw new RuntimeException("ZooKeeper configs are not found in storm.yaml: " + Config.STORM_ZOOKEEPER_SERVERS + ", " + Config.STORM_ZOOKEEPER_PORT); | ||
| } else { | ||
| List<String> zkConnectionList = new ArrayList<>(); | ||
| for (String server : zkServerSet) { | ||
| zkConnectionList.add(String.format("%s:%s", server, zkPort)); | ||
| } | ||
| _zkClient = new ZKClient(StringUtils.join(zkConnectionList, ',')); | ||
| } | ||
| } | ||
| initializeZkClient(conf); | ||
| _logviewerZkDir = Optional.fromNullable((String) conf.get(Config.STORM_ZOOKEEPER_ROOT)).or("") + "/storm-mesos/logviewers"; | ||
| LOG.info("Logviewer ZK path: {}", _logviewerZkDir); | ||
|
|
||
| Boolean preferReservedResources = (Boolean) conf.get(CONF_MESOS_PREFER_RESERVED_RESOURCES); | ||
| if (preferReservedResources != null) { | ||
| _preferReservedResources = preferReservedResources; | ||
| } | ||
|
|
||
| _container = Optional.fromNullable((String) conf.get(CONF_MESOS_CONTAINER_DOCKER_IMAGE)); | ||
| _mesosScheduler = new NimbusMesosScheduler(this, _zkClient, _logviewerZkDir); | ||
| _mesosScheduler = new NimbusMesosScheduler(this, _zkClient, _logviewerZkDir, _enabledLogviewerSidecar); | ||
|
|
||
| // Generate YAML to be served up to clients | ||
| _generatedConfPath = Paths.get( | ||
|
|
@@ -272,6 +259,21 @@ void initializeMesosStormConf(Map conf, String localDir) { | |
| } | ||
| } | ||
|
|
||
| private void initializeZkClient(Map conf) { | ||
| Set<String> zkServerSet = listIntoSet((List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS)); | ||
| String zkPort = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_PORT)); | ||
|
|
||
| if (zkPort == null || zkServerSet == null) { | ||
| throw new RuntimeException("ZooKeeper configs are not found in storm.yaml: " + Config.STORM_ZOOKEEPER_SERVERS + ", " + Config.STORM_ZOOKEEPER_PORT); | ||
| } else { | ||
| List<String> zkConnectionList = new ArrayList<>(); | ||
| for (String server : zkServerSet) { | ||
| zkConnectionList.add(String.format("%s:%s", server, zkPort)); | ||
| } | ||
| _zkClient = new ZKClient(StringUtils.join(zkConnectionList, ',')); | ||
| } | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| protected void startLocalHttpServer() throws Exception { | ||
| createLocalServerPort(); | ||
|
|
@@ -286,34 +288,32 @@ public void doRegistration(final SchedulerDriver driver, Protos.FrameworkID id) | |
| _state.put(FRAMEWORK_ID, id.getValue()); | ||
| _offers = new HashMap<Protos.OfferID, Protos.Offer>(); | ||
|
|
||
| if (_enabledLogviewerSidecar) { | ||
|
|
||
| _timer.scheduleAtFixedRate(new TimerTask() { | ||
| @Override | ||
| public void run() { | ||
| // performing "explicit" reconciliation; master will respond with the latest state for all logviewer tasks | ||
| // in the framework scheduler's statusUpdate() method | ||
| List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>(); | ||
| List<String> logviewerPaths = _zkClient.getChildren(_logviewerZkDir); | ||
| if (logviewerPaths == null) { | ||
| _driver.reconcileTasks(taskStatuses); | ||
| return; | ||
| } | ||
| for (String path : logviewerPaths) { | ||
| TaskID logviewerTaskId = TaskID.newBuilder() | ||
| .setValue(new String(_zkClient.getNodeData(String.format("%s/%s", _logviewerZkDir, path)))) | ||
| .build(); | ||
| TaskStatus logviewerTaskStatus = TaskStatus.newBuilder() | ||
| .setTaskId(logviewerTaskId) | ||
| .setState(TaskState.TASK_RUNNING) | ||
| .build(); | ||
| taskStatuses.add(logviewerTaskStatus); | ||
| } | ||
| _timer.scheduleAtFixedRate(new TimerTask() { | ||
| @Override | ||
| public void run() { | ||
| // performing "explicit" reconciliation; master will respond with the latest state for all logviewer tasks | ||
| // in the framework scheduler's statusUpdate() method | ||
| List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>(); | ||
| List<String> logviewerPaths = _zkClient.getChildren(_logviewerZkDir); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this the line of code that is generating the log line that you mention as the visible side-effect of this change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct. The more I think about this (and having seen the code run on our systems), I think this trace will be a nuisance, given how frequently the timer task runs (every 5 mins). I'm in favor of adding an explicit check to first test that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for the incredibly slow response. I like this solution, please implement. |
||
| if (logviewerPaths == null || !_enabledLogviewerSidecar) { | ||
| _driver.reconcileTasks(taskStatuses); | ||
| LOG.info("Performing task reconciliation between scheduler and master on following tasks: {}", taskStatusListToTaskIDsString(taskStatuses)); | ||
| return; | ||
| } | ||
| }, 0, TASK_RECONCILIATION_INTERVAL); // reconciliation performed every 5 minutes | ||
| } | ||
|
|
||
| for (String path : logviewerPaths) { | ||
| TaskID logviewerTaskId = TaskID.newBuilder() | ||
| .setValue(new String(_zkClient.getNodeData(String.format("%s/%s", _logviewerZkDir, path)))) | ||
| .build(); | ||
| TaskStatus logviewerTaskStatus = TaskStatus.newBuilder() | ||
| .setTaskId(logviewerTaskId) | ||
| .setState(TaskState.TASK_RUNNING) | ||
| .build(); | ||
| taskStatuses.add(logviewerTaskStatus); | ||
| } | ||
| _driver.reconcileTasks(taskStatuses); | ||
| LOG.info("Performing task reconciliation between scheduler and master on following tasks: {}", taskStatusListToTaskIDsString(taskStatuses)); | ||
| } | ||
| }, 0, TASK_RECONCILIATION_INTERVAL); // reconciliation performed every 5 minutes | ||
| } | ||
|
|
||
| public void shutdown() throws Exception { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,13 +17,7 @@ | |
| */ | ||
| package storm.mesos; | ||
|
|
||
| import org.apache.mesos.Protos.ExecutorID; | ||
| import org.apache.mesos.Protos.FrameworkID; | ||
| import org.apache.mesos.Protos.MasterInfo; | ||
| import org.apache.mesos.Protos.Offer; | ||
| import org.apache.mesos.Protos.OfferID; | ||
| import org.apache.mesos.Protos.SlaveID; | ||
| import org.apache.mesos.Protos.TaskStatus; | ||
| import org.apache.mesos.Protos.*; | ||
|
||
| import org.apache.mesos.Scheduler; | ||
| import org.apache.mesos.SchedulerDriver; | ||
| import org.slf4j.Logger; | ||
|
|
@@ -42,12 +36,14 @@ public class NimbusMesosScheduler implements Scheduler { | |
| private ZKClient zkClient; | ||
| private String logviewerZkDir; | ||
| private CountDownLatch _registeredLatch = new CountDownLatch(1); | ||
| private boolean enableLogViewers; | ||
| public static final Logger LOG = LoggerFactory.getLogger(MesosNimbus.class); | ||
|
|
||
| public NimbusMesosScheduler(MesosNimbus mesosNimbus, ZKClient zkClient, String logviewerZkDir) { | ||
| public NimbusMesosScheduler(MesosNimbus mesosNimbus, ZKClient zkClient, String logviewerZkDir, boolean enableLogViewers) { | ||
| this.mesosNimbus = mesosNimbus; | ||
| this.zkClient = zkClient; | ||
| this.logviewerZkDir = logviewerZkDir; | ||
| this.enableLogViewers = enableLogViewers; | ||
| } | ||
|
|
||
| public void waitUntilRegistered() throws InterruptedException { | ||
|
|
@@ -98,6 +94,7 @@ public void statusUpdate(SchedulerDriver driver, TaskStatus status) { | |
| if (status.getTaskId().getValue().contains("logviewer")) { | ||
| updateLogviewerState(status); | ||
| } | ||
|
|
||
|
||
| switch (status.getState()) { | ||
| case TASK_STAGING: | ||
| case TASK_STARTING: | ||
|
|
@@ -125,25 +122,34 @@ private void updateLogviewerState(TaskStatus status) { | |
| LOG.error("updateLogviewerState: taskId for logviewer, {}, isn't formatted correctly so ignoring task update", taskId); | ||
| return; | ||
| } | ||
|
|
||
| String nodeId = taskId.split("\\" + MesosCommon.MESOS_COMPONENT_ID_DELIMITER)[1]; | ||
| String logviewerZKPath = String.format("%s/%s", logviewerZkDir, nodeId); | ||
|
|
||
| if (!enableLogViewers) { | ||
| LOG.info("Logviewers are disabled. Reaping existing logviewer task {}", taskId); | ||
| reapLogviewerTask(logviewerZKPath, status); | ||
| return; | ||
| } | ||
|
|
||
| switch (status.getState()) { | ||
| case TASK_STAGING: | ||
| checkRunningLogviewerState(logviewerZKPath); | ||
| ensureZNodeExists(logviewerZKPath); | ||
| return; | ||
| case TASK_STARTING: | ||
| checkRunningLogviewerState(logviewerZKPath); | ||
| ensureZNodeExists(logviewerZKPath); | ||
| return; | ||
| case TASK_RUNNING: | ||
| checkRunningLogviewerState(logviewerZKPath); | ||
| ensureZNodeExists(logviewerZKPath); | ||
| return; | ||
| case TASK_LOST: | ||
| // this status update can be triggered by the explicit kill and isn't terminal, do not kill again | ||
| break; | ||
| default: | ||
| // explicitly kill the logviewer task to ensure logviewer is terminated | ||
| // explicitly kill the logviewer task to ensure it is terminated | ||
| mesosNimbus._driver.killTask(status.getTaskId()); | ||
| } | ||
|
|
||
| // if it gets to this point it means logviewer terminated; update ZK with new logviewer state | ||
| if (zkClient.nodeExists(logviewerZKPath)) { | ||
| LOG.info("updateLogviewerState: Remove logviewer state in zk at {} for logviewer task {}", logviewerZKPath, taskId); | ||
|
|
@@ -154,13 +160,34 @@ private void updateLogviewerState(TaskStatus status) { | |
| } | ||
| } | ||
|
|
||
| private void checkRunningLogviewerState(String logviewerZKPath) { | ||
| private void ensureZNodeExists(String logviewerZKPath) { | ||
|
||
| if (!zkClient.nodeExists(logviewerZKPath)) { | ||
| LOG.error("checkRunningLogviewerState: Running mesos logviewer task exists for logviewer that isn't tracked in ZooKeeper"); | ||
| LOG.warn("ensureZNodeExists: Running mesos logviewer task exists for logviewer that isn't tracked in ZooKeeper"); | ||
| zkClient.createNode(logviewerZKPath); | ||
| } | ||
| } | ||
|
|
||
| private void reapLogviewerTask(String logviewerZKPath, TaskStatus status) { | ||
| String taskId = status.getTaskId().getValue(); | ||
| if (zkClient.nodeExists(logviewerZKPath)) { | ||
| LOG.info("reapLogviewerTask: Remove logviewer state in zk at {} for logviewer task {}", logviewerZKPath, taskId); | ||
| zkClient.deleteNode(logviewerZKPath); | ||
| } | ||
|
|
||
| switch (status.getState()) { | ||
| case TASK_FAILED: | ||
| case TASK_FINISHED: | ||
| case TASK_KILLED: | ||
| case TASK_LOST: | ||
| // terminal states | ||
| break; | ||
| default: | ||
| // explicitly kill the logviewer task to ensure it is terminated | ||
| LOG.info("reapLogviewerTask: Killing logviewer mesos task {}", logviewerZKPath, taskId); | ||
| mesosNimbus._driver.killTask(status.getTaskId()); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void frameworkMessage(SchedulerDriver driver, ExecutorID executorId, SlaveID slaveId, byte[] data) { | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
align this with the rest of the code please
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually aligned with the rest of the code. The diff makes it seem like it isn't. I'm taking the timer thread out of the
if (_enabledLogviewerSidecar)block and scheduling it unconditionally. See https://github.com/srikanth-viswanathan/mesos-storm/blob/66dcbbe502aa3438b2a1b107d146e25de058f82c/storm/src/main/storm/mesos/MesosNimbus.java#L291 for the full file.