博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink -- Failover
阅读量:7048 次
发布时间:2019-06-28

本文共 17769 字,大约阅读时间需要 59 分钟。

 

JobManager failover

 

LeaderLatch

private synchronized void setLeadership(boolean newValue) {
boolean oldValue = hasLeadership.getAndSet(newValue); if ( oldValue && !newValue ) //原来是leader,当前不是leader,所以是lost leadership { // Lost leadership, was true, now false listeners.forEach(new Function
() {
@Override public Void apply(LeaderLatchListener listener) {
listener.notLeader(); return null; } }); } else if ( !oldValue && newValue ) { // Gained leadership, was false, now true listeners.forEach(new Function
() {
@Override public Void apply(LeaderLatchListener input) {
input.isLeader(); return null; } }); } notifyAll(); }
 

ZooKeeperLeaderElectionService

@Override public void isLeader() {
synchronized (lock) {
issuedLeaderSessionID = UUID.randomUUID(); leaderContender.grantLeadership(issuedLeaderSessionID); } } @Override public void notLeader() {
synchronized (lock) {
issuedLeaderSessionID = null; confirmedLeaderSessionID = null; leaderContender.revokeLeadership(); } }

可以看到,只是分别调用leaderContender.grantLeadership,leaderContender.revokeLeadership

 

而JobManager继承了leaderContender接口,

revokeLeadership

val newFuturesToComplete = cancelAndClearEverything(   new Exception("JobManager is no longer the leader."))

 

在cancelAndClearEverything中,关键的是suspend executionGraph;停止执行,但是并不会job删除,这样其他的JobManager还能重新提交

* The SUSPENDED state is a local terminal state which stops the execution of the job but does * not remove the job from the HA job store so that it can be recovered by another JobManager.
private def cancelAndClearEverything(cause: Throwable)   : Seq[Future[Unit]] = {
val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
future {
eg.suspend(cause) //suspend Execution Graph if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) {
jobInfo.client ! decorateMessage( Failure(new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause))) } }(context.dispatcher) } currentJobs.clear() futures.toSeq }

 

grantLeadership

context.system.scheduler.scheduleOnce(   jobRecoveryTimeout,   self,   decorateMessage(RecoverAllJobs))(   context.dispatcher)

主要是要恢复所有的job,RecoverAllJobs

case RecoverAllJobs =>   future {
try {
// The ActorRef, which is part of the submitted job graph can only be // de-serialized in the scope of an actor system. akka.serialization.JavaSerializer.currentSystem.withValue( context.system.asInstanceOf[ExtendedActorSystem]) {
log.info(s"Attempting to recover all jobs.") val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala //从submittedJobGraphs store里面读出所有submitted的job,也是从zk里面读出 if (!leaderElectionService.hasLeadership()) {
// we've lost leadership. mission: abort. log.warn(s"Lost leadership during recovery. Aborting recovery of ${jobGraphs.size} " + s"jobs.") } else {
log.info(s"Re-submitting ${jobGraphs.size} job graphs.") jobGraphs.foreach{
submittedJobGraph => self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph)) //recover job } } } } catch {
case t: Throwable => log.error("Fatal error: Failed to recover jobs.", t) } }(context.dispatcher)

 

在recover job,

case RecoverSubmittedJob(submittedJobGraph) =>   if (!currentJobs.contains(submittedJobGraph.getJobId)) {
submitJob( submittedJobGraph.getJobGraph(), submittedJobGraph.getJobInfo(), isRecovery = true) } else {
log.info(s"Ignoring job recovery for ${submittedJobGraph.getJobId}, " + s"because it is already submitted.") }

其实就是重新的submit job,注意这里的,isRecovery = true

在submit job时,如果isRecovery = true,会做下面的操作,然后后续具体的操作参考Checkpoint篇

if (isRecovery) {
executionGraph.restoreLatestCheckpointedState() }

 

TaskManager Failover

在job manager内部通过death watch发现task manager dead,

/**    * Handler to be executed when a task manager terminates.    * (Akka Deathwatch or notifiction from ResourceManager)    *    * @param taskManager The ActorRef of the taskManager    */  private def handleTaskManagerTerminated(taskManager: ActorRef): Unit = {    if (instanceManager.isRegistered(taskManager)) {      log.info(s"Task manager ${taskManager.path} terminated.")      instanceManager.unregisterTaskManager(taskManager, true)      context.unwatch(taskManager)    }  }

instanceManager.unregisterTaskManager,

/*** Unregisters the TaskManager with the given {
@link ActorRef}. Unregistering means to mark* the given instance as dead and notify {
@link InstanceListener} about the dead instance.** @param instanceID TaskManager which is about to be marked dead.*/public void unregisterTaskManager(ActorRef instanceID, boolean terminated){ Instance instance = registeredHostsByConnection.get(instanceID); if (instance != null){ ActorRef host = instance.getActorGateway().actor(); registeredHostsByConnection.remove(host); registeredHostsById.remove(instance.getId()); registeredHostsByResource.remove(instance.getResourceId()); if (terminated) { deadHosts.add(instance.getActorGateway().actor()); } instance.markDead(); totalNumberOfAliveTaskSlots -= instance.getTotalNumberOfSlots(); notifyDeadInstance(instance); }}

 

instance.markDead()

public void markDead() {    // create a copy of the slots to avoid concurrent modification exceptions    List
slots; synchronized (instanceLock) { if (isDead) { return; } isDead = true; // no more notifications for the slot releasing this.slotAvailabilityListener = null; slots = new ArrayList
(allocatedSlots); allocatedSlots.clear(); availableSlots.clear(); } /* * releaseSlot must not own the instanceLock in order to avoid dead locks where a slot * owning the assignment group lock wants to give itself back to the instance which requires * the instance lock */ for (Slot slot : slots) { slot.releaseSlot(); }}

 

SimpleSolt.releaseSlot

@Override public void releaseSlot() {     if (!isCanceled()) {         // kill all tasks currently running in this slot         Execution exec = this.executedTask;         if (exec != null && !exec.isFinished()) {             exec.fail(new Exception(                     "The slot in which the task was executed has been released. Probably loss of TaskManager "                             + getInstance()));         }         // release directly (if we are directly allocated),         // otherwise release through the parent shared slot         if (getParent() == null) {             // we have to give back the slot to the owning instance             if (markCancelled()) {                 getInstance().returnAllocatedSlot(this);             }         } else {             // we have to ask our parent to dispose us             getParent().releaseChild(this);         }}

 

Execution.fail

public void fail(Throwable t) {
processFail(t, false); }

 

Execution.processFail

先将Execution的状态设为failed

transitionState(current, FAILED, t)
private boolean transitionState(ExecutionState currentState, ExecutionState targetState, Throwable error) {     if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) {        markTimestamp(targetState);         try {            vertex.notifyStateTransition(attemptId, targetState, error);        }        catch (Throwable t) {            LOG.error("Error while notifying execution graph of execution state transition.", t);        }        return true;    } else {        return false;    }}

设置完后,需要notifyStateTransition

getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, executionId, newState, error);
void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState                        newExecutionState, Throwable error){    ExecutionJobVertex vertex = getJobVertex(vertexId);    if (executionListenerActors.size() > 0) {        String message = error == null ? null : ExceptionUtils.stringifyException(error);        ExecutionGraphMessages.ExecutionStateChanged actorMessage =                new ExecutionGraphMessages.ExecutionStateChanged(jobID, vertexId,  vertex.getJobVertex().getName(),                                                                vertex.getParallelism(), subtask,                                                                executionID, newExecutionState,                                                                System.currentTimeMillis(), message);        for (ActorGateway listener : executionListenerActors) {            listener.tell(actorMessage);        }    }    // see what this means for us. currently, the first FAILED state means -> FAILED    if (newExecutionState == ExecutionState.FAILED) {        fail(error);    }}

主要就是将ExecutionGraphMessages.ExecutionStateChanged,发送给所有的listeners

listener是在JobManager里面在提交job的时候加上的,

if (jobInfo.listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {          // the sender wants to be notified about state changes          val gateway = new AkkaActorGateway(jobInfo.client, leaderSessionID.orNull)          executionGraph.registerExecutionListener(gateway)          executionGraph.registerJobStatusListener(gateway)      }

而在client,

JobClientActor,只是log和print这些信息
if (message instanceof ExecutionGraphMessages.ExecutionStateChanged) {    logAndPrintMessage((ExecutionGraphMessages.ExecutionStateChanged) message);} else if (message instanceof ExecutionGraphMessages.JobStatusChanged) {    logAndPrintMessage((ExecutionGraphMessages.JobStatusChanged) message);}

 

注意,这里如果newExecutionState == ExecutionState.FAILED,会调用ExecutionGraph.fail

就像注释说的,第一个failed,就意味着整个jobfailed

public void fail(Throwable t) {    while (true) {        JobStatus current = state;        // stay in these states        if (current == JobStatus.FAILING ||            current == JobStatus.SUSPENDED ||            current.isGloballyTerminalState()) {            return;        } else if (current == JobStatus.RESTARTING && transitionState(current, JobStatus.FAILED, t)) {            synchronized (progressLock) {                postRunCleanup();                progressLock.notifyAll();                return;            }        } else if (transitionState(current, JobStatus.FAILING, t)) { //将job的状态设为JobStatus.FAILING            this.failureCause = t;            if (!verticesInCreationOrder.isEmpty()) {                // cancel all. what is failed will not cancel but stay failed                for (ExecutionJobVertex ejv : verticesInCreationOrder) {                    ejv.cancel();                }            } else {                // set the state of the job to failed                transitionState(JobStatus.FAILING, JobStatus.FAILED, t); //            }            return;        }    }}

可以看到,这里直接把job状态设为Failing,并且调用所有的ExecutionJobVertex.cancel

 

接着,从ExecutionGraph中deregister这个execution,

vertex.getExecutionGraph().deregisterExecution(this);
Execution contained = currentExecutions.remove(exec.getAttemptId());

 

最终,调用

vertex.executionFailed(t);
void executionFailed(Throwable t) {    jobVertex.vertexFailed(subTaskIndex, t);}

 

ExecutionJobVertexvoid vertexFailed(int subtask, Throwable error) {    subtaskInFinalState(subtask);}private void subtaskInFinalState(int subtask) {    synchronized (stateMonitor) {        if (!finishedSubtasks[subtask]) {            finishedSubtasks[subtask] = true;                        if (numSubtasksInFinalState+1 == parallelism) { //看看对于Vertex而言,是否所有的subTask都已经finished                                // call finalizeOnMaster hook                try {                    getJobVertex().finalizeOnMaster(getGraph().getUserClassLoader());                }                catch (Throwable t) {                    getGraph().fail(t);                }                numSubtasksInFinalState++;                                // we are in our final state                stateMonitor.notifyAll();                                // tell the graph                graph.jobVertexInFinalState();            } else {                numSubtasksInFinalState++;            }        }    }}

graph.jobVertexInFinalState()

void jobVertexInFinalState() {        numFinishedJobVertices++;        if (numFinishedJobVertices == verticesInCreationOrder.size()) { //是否所有JobVertices都已经finished            // we are done, transition to the final state            JobStatus current;            while (true) {                current = this.state;                if (current == JobStatus.RUNNING) {                    if (transitionState(current, JobStatus.FINISHED)) {                        postRunCleanup();                        break;                    }                }                else if (current == JobStatus.CANCELLING) {                    if (transitionState(current, JobStatus.CANCELED)) {                        postRunCleanup();                        break;                    }                }                else if (current == JobStatus.FAILING) {                    boolean allowRestart = !(failureCause instanceof SuppressRestartsException);                    if (allowRestart && restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) {                        restartStrategy.restart(this);                        break;                    } else if ((!allowRestart || !restartStrategy.canRestart()) && transitionState(current, JobStatus.FAILED, failureCause)) {                        postRunCleanup();                        break;                    }                }                else if (current == JobStatus.SUSPENDED) {                    // we've already cleaned up when entering the SUSPENDED state                    break;                }                else if (current.isGloballyTerminalState()) {                    LOG.warn("Job has entered globally terminal state without waiting for all " +                        "job vertices to reach final state.");                    break;                }                else {                    fail(new Exception("ExecutionGraph went into final state from state " + current));                    break;                }            }            // done transitioning the state            // also, notify waiters            progressLock.notifyAll();        }    }}

如果Job状态是JobStatus.FAILING,并且满足restart的条件,transitionState(current, JobStatus.RESTARTING)

restartStrategy.restart(this);

这个restart策略是可以配置的,但无论什么策略最终调用到,

executionGraph.restart();
public void restart() {    try {        synchronized (progressLock) {            JobStatus current = state;            if (current == JobStatus.CANCELED) {                LOG.info("Canceled job during restart. Aborting restart.");                return;            } else if (current == JobStatus.FAILED) {                LOG.info("Failed job during restart. Aborting restart.");                return;            } else if (current == JobStatus.SUSPENDED) {                LOG.info("Suspended job during restart. Aborting restart.");                return;            } else if (current != JobStatus.RESTARTING) {                throw new IllegalStateException("Can only restart job from state restarting.");            }            if (scheduler == null) {                throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null.");            }            this.currentExecutions.clear();            Collection
colGroups = new HashSet<>(); for (ExecutionJobVertex jv : this.verticesInCreationOrder) { CoLocationGroup cgroup = jv.getCoLocationGroup(); if(cgroup != null && !colGroups.contains(cgroup)){ cgroup.resetConstraints(); colGroups.add(cgroup); } jv.resetForNewExecution(); } for (int i = 0; i < stateTimestamps.length; i++) { if (i != JobStatus.RESTARTING.ordinal()) { // Only clear the non restarting state in order to preserve when the job was // restarted. This is needed for the restarting time gauge stateTimestamps[i] = 0; } } numFinishedJobVertices = 0; transitionState(JobStatus.RESTARTING, JobStatus.CREATED); // if we have checkpointed state, reload it into the executions if (checkpointCoordinator != null) { boolean restored = checkpointCoordinator .restoreLatestCheckpointedState(getAllVertices(), false, false); //重新加载checkpoint和状态 // TODO(uce) Temporary work around to restore initial state on // failure during recovery. Will be superseded by FLINK-3397. if (!restored && savepointCoordinator != null) { String savepointPath = savepointCoordinator.getSavepointRestorePath(); if (savepointPath != null) { savepointCoordinator.restoreSavepoint(getAllVertices(), savepointPath); } } } } scheduleForExecution(scheduler); //把ExecuteGraph加入调度,重新提交 } catch (Throwable t) { fail(t); }}

转载地址:http://dcdol.baihongyu.com/

你可能感兴趣的文章
网络流24题 餐巾计划问题
查看>>
基于 Android NDK 的学习之旅-----序言
查看>>
InnoDB recovery过程解析
查看>>
鼓浪屿
查看>>
alloc_skb申请函数分析
查看>>
WPF PRISM开发入门二(Unity依赖注入容器使用)
查看>>
使用 data-* 属性来嵌入自定义数据:
查看>>
炒股的常见技术指标
查看>>
工控随笔_07_西门子_WinCC利用命令行实现操作log日志
查看>>
解决MySQL报错The server time zone value 'Öйú±ê׼ʱ¼ä' is unrecognized or represents ........
查看>>
(筆記) 如何設計邊緣檢測電路? (SOC) (Verilog)
查看>>
MPEG文件中什么是GOP
查看>>
C#查找指定窗口的子窗口的句柄
查看>>
Linux man命令的使用方法
查看>>
在delphi中嵌入脚本语言--(译)RemObjects Pascal Script使用说明(1)(译)
查看>>
Icon cache rebuilding with Delphi(Delphi 清除Windows 图标缓存源代码)
查看>>
Azure VMSS (2) 对VM执行Generalize操作
查看>>
C# 4.0四大新特性代码示例与解读
查看>>
HUST 1017 Exact cover
查看>>
SessionStateStoreProviderBase.GetItemExclusive Method
查看>>