Oozie 第一个版的监控是自定义的,后面引进了 做监控当下主流的框架 Codahale Metrics 本文从oozie自身的Instrumentation介绍,以及后面如果对接 Codahale Metrics;
Oozie自身的Instrumentation 框架定义了四种类型的监控Timers、Counters、Variables、Sampler,所有的指标都通过 group和name两个key来进行分类。
public class Instrumentation {
private ScheduledExecutorService scheduler;
private Lock counterLock;
private Lock timerLock;
private Lock variableLock;
private Lock samplerLock;
private Map<String, Map<String, Map<String, Object>>> all;
private Map<String, Map<String, Element<Long>>> counters;
private Map<String, Map<String, Element<Timer>>> timers;
private Map<String, Map<String, Element<Variable>>> variables;
private Map<String, Map<String, Element<Double>>> samplers;
cron 计时器的定义:
/**
* Cron is a stopwatch that can be started/stopped several times. <p/> This class is not thread safe, it does not
* need to be. <p/> It keeps track of the total time (first start to last stop) and the running time (total time
* minus the stopped intervals). <p/> Once a Cron is complete it must be added to the corresponding group/name in a
* Instrumentation instance.
*/
public static class Cron {
private long start;
private long end;
private long lapStart;
private long own;
private long total;
private boolean running;
/**
* Creates new Cron, stopped, in zero.
*/
public Cron() {
running = false;
}
/**
* Start the cron. It cannot be already started.
*/
public void start() {
if (!running) {
if (lapStart == 0) {
lapStart = System.currentTimeMillis();
if (start == 0) {
start = lapStart;
end = start;
}
}
running = true;
}
}
/**
* Stops the cron. It cannot be already stopped.
*/
public void stop() {
if (running) {
end = System.currentTimeMillis();
if (start == 0) {
start = end;
}
total = end - start;
if (lapStart > 0) {
own += end - lapStart;
lapStart = 0;
}
running = false;
}
}
Counter定义:
-
action.executors - Counters related to actions.
-
[action_type]#action.[operation_performed] (start, end, check, kill)
-
[action_type]#ex.[exception_type] (transient, non-transient, error, failed)
-
e.g.
-
callablequeue - count of events in various execution queues.
-
delayed.queued: Number of commands queued with a delay.
-
executed: Number of executions from the queue.
-
failed: Number of queue attempts which failed.
-
queued: Number of queued commands.
-
commands: Execution Counts for various commands. This data is generated for all commands.
-
action.end
-
action.notification
-
action.start
-
callback
-
job.info
-
job.notification
-
purge
-
signal
-
start
-
submit
-jobs: Job Statistics -
start: Number of started jobs.
-
submit: Number of submitted jobs.
-
succeeded: Number of jobs which succeeded.
-
kill: Number of killed jobs.
-authorization -
failed: Number of failed authorization attempts.
-webservices: Number of request to various web services along with the request type. -
failed: total number of failed requests.
-
requests: total number of requests.
-
admin
-
admin-GET
-
callback
-
callback-GET
-
jobs
-
jobs-GET
-
jobs-POST
-
version
-
version-GET
private static class Counter extends AtomicLong implements Element<Long> {
/**
* Return the counter snapshot.
*
* @return the counter snapshot.
*/
public Long getValue() {
return get();
}
/**
* Return the String representation of the counter value.
*
* @return the String representation of the counter value.
*/
public String toString() {
return Long.toString(get());
}
}
Timer定义:
- action.executors - Counters related to actions.
- [action_type]#action.[operation_performed] (start, end, check, kill)
- callablequeue
- time.in.queue: Time a callable spent in the queue before being processed.
- commands: Generated for all Commands.
- action.end
- action.notification
- action.start
- callback
- job.info
- job.notification
- purge
- signal
- start
- submit
- Timers related to various database operations.
- create-workflow
- load-action
- load-pending-actions
- load-running-actions
- load-workflow
- load-workflows
- purge-old-workflows
- save-action
- update-action
- update-workflow
-webservices - admin
- admin-GET
- callback
- callback-GET
- jobs
- jobs-GET
- jobs-POST
- version
- version-GET
public static class Timer implements Element<Timer> {
Lock lock = new ReentrantLock();
private long ownTime;
private long totalTime;
private long ticks;
private long ownSquareTime;
private long totalSquareTime;
private long ownMinTime;
private long ownMaxTime;
private long totalMinTime;
private long totalMaxTime;
/**
* Timer constructor. <p/> It is project private for test purposes.
*/
Timer() { }
/**
* Return the String representation of the timer value.
*
* @return the String representation of the timer value.
*/
public String toString() {
return XLog.format("ticks[{0}] totalAvg[{1}] ownAvg[{2}]", ticks, getTotalAvg(), getOwnAvg());
}
/**
* Return the timer snapshot.
*
* @return the timer snapshot.
*/
public Timer getValue() {
try {
lock.lock();
Timer timer = new Timer();
timer.ownTime = ownTime;
timer.totalTime = totalTime;
timer.ticks = ticks;
timer.ownSquareTime = ownSquareTime;
timer.totalSquareTime = totalSquareTime;
timer.ownMinTime = ownMinTime;
timer.ownMaxTime = ownMaxTime;
timer.totalMinTime = totalMinTime;
timer.totalMaxTime = totalMaxTime;
return timer;
}
finally {
lock.unlock();
}
}
/**
* Add a cron to a timer. <p/> It is project private for test purposes.
*
* @param cron Cron to add.
*/
void addCron(Cron cron) {
try {
lock.lock();
long own = cron.getOwn();
long total = cron.getTotal();
ownTime += own;
totalTime += total;
ticks++;
ownSquareTime += own * own;
totalSquareTime += total * total;
if (ticks == 1) {
ownMinTime = own;
ownMaxTime = own;
totalMinTime = total;
totalMaxTime = total;
}
else {
ownMinTime = Math.min(ownMinTime, own);
ownMaxTime = Math.max(ownMaxTime, own);
totalMinTime = Math.min(totalMinTime, total);
totalMaxTime = Math.max(totalMaxTime, total);
}
}
finally {
lock.unlock();
}
}
Variable 定义:
-
oozie
-
version: Oozie build version.
-
configuration
-
config.dir: directory from where the configuration files are loaded. If null, all configuration files are loaded from the classpath
-
config.file: the Oozie custom configuration for the instance.
-jvm -
free.memory
-
max.memory
-
total.memory
-locks
- locks: Locks are used by Oozie to synchronize access to workflow and action entries when the database being used does not support 'select for update' queries. (MySQL supports 'select for update').
-logging - config.file: Log4j '.properties' configuration file.
- from.classpath: whether the config file has been read from the claspath or from the config directory.
- reload.interval: interval at which the config file will be realoded. 0 if the config file will never be reloaded, when loaded from the classpath is never reloaded.
public interface Variable<T> extends Element<T> {}
Sampler定义:
- callablequeue
- delayed.queue.size: The size of the delayed command queue.
- queue.size: The size of the command queue.
- threads.active: The number of threads processing callables.
- jdbc:
- connections.active: Active Connections over the past minute.
- webservices: Requests to the Oozie HTTP endpoints over the last minute.
- admin
- callback
- job
- jobs
- requests
- version
private static class Sampler implements Element<Double>, Runnable {
private Lock lock = new ReentrantLock();
private int samplingInterval;
private Variable<Long> variable;
private long[] values;
private int current;
private long valuesSum;
private double rate;
public Sampler(int samplingPeriod, int samplingInterval, Variable<Long> variable) {
this.samplingInterval = samplingInterval;
this.variable = variable;
values = new long[samplingPeriod / samplingInterval];
valuesSum = 0;
current = -1;
}
public int getSamplingInterval() {
return samplingInterval;
}
public void run() {
try {
lock.lock();
long newValue = variable.getValue();
if (current == -1) {
valuesSum = newValue;
current = 0;
values[current] = newValue;
}
else {
current = (current + 1) % values.length;
valuesSum = valuesSum - values[current] + newValue;
values[current] = newValue;
}
rate = ((double) valuesSum) / values.length;
}
finally {
lock.unlock();
}
}
public Double getValue() {
return rate;
}
}
网友评论