Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions 1 Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ RUN cd /app && npm install --production

# Bundle app source
COPY *.js ./
COPY protocols/*.js protocols/
COPY openapi/l7mp-openapi.yaml openapi/

# Expose the control port
Expand Down
117 changes: 116 additions & 1 deletion 117 cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const Rule = require('./rule.js').Rule;
const monitoring = require('./monitoring.js').Monitoring
const clusterMetricRegistry = require('./monitoring').clusterMetricRegistry;
const metricClusterMetricRegistry = require('./monitoring').metricClusterMetricRegistry;
const DNSProtocol = require('./protocols/dns.js').DNSProtocol;

const {L7mpError, Ok, InternalError, BadRequestError, NotFoundError, GeneralError} = require('./error.js');

Expand Down Expand Up @@ -395,6 +396,18 @@ class JSONSocketEndPoint extends EndPoint {
// }
};

class L7ProtocolEndPoint extends EndPoint {
constructor(c, e) {
super(c, e);
this.transport_endpoint = this.cluster.transport.addEndPoint(e);
}

connect(s){
log.silly('L7ProtocolEndpoint.connect:', dumper(s, 1));
return this.transport_endpoint.connect(s);
}
};

EndPoint.create = (c, e) => {
log.silly('EndPoint.create:', `Protocol: ${c.protocol}` );
switch(c.protocol){
Expand All @@ -405,6 +418,7 @@ EndPoint.create = (c, e) => {
case 'UnixDomainSocket': return new NetSocketEndPoint(c, e);
case 'Test': return new TestEndPoint(c, e);
case 'JSONSocket': return new JSONSocketEndPoint(c, e);
case 'L7Protocol': return new L7ProtocolEndPoint(c, e);
default:
let err = `Adding an endpoint to a cluster of type "${c.protocol}" is not supported`;
log.warn('EndPoint.create:', err);
Expand Down Expand Up @@ -623,7 +637,7 @@ class JSONSocketCluster extends Cluster {
log.info('JSONSocketCluster:', `${this.name} initialized, using transport:`,
dumper(this.transport, 2));
}

// 2. add JSONSocket fields to metadata and send as header
// 4. wait for a valid response header from server and return stream to caller
async _stream(s){
Expand Down Expand Up @@ -738,6 +752,106 @@ class JSONSocketCluster extends Cluster {
}
};

class L7ProtocolCluster extends Cluster {
constructor(c) {
super(c);

let cu = {
name: c.name + '-transport-cluster',
spec: c.spec.transport_spec || c.spec.transport,
loadbalancer: c.loadbalancer || { policy: 'Trivial' },
};
if(!cu.spec){
let e = 'L7ProtocolCluster: No transport specified';
log.warn(e);
throw new Error(e);
}
this.transport = Cluster.create(cu);
this.type = this.transport.type;
this.header = c.spec.header || [];
this.loadbalancer = this.transport.loadbalancer;

switch(this.protocol){
case 'DNS':
this.l7_proto = new DNSProtocol;
break;
default:
log.info('L7ProtocolListener:', `protocol: ${this.protocol}:`, `${this.name}:`,
`Unknown protocol ${this.protocol}`);
return null; // will break soon
}

log.info('L7ProtocolCluster:', `protocol: ${this.protocol}: ${this.name} initialized,`,
`using transport:`, dumper(this.transport, 2));
}

// must specialcase
addEndPoint(e){
log.silly('L7ProtocolCluster.addEndPoint:', `protocol: ${this.protocol}:`,
`cluster: ${this.name}:`, dumper(e, 2));
let c = {... this};
c.protocol = 'L7Protocol';
let ep = EndPoint.create(c, e);
this.endpoints.push(ep);
this.loadbalancer.update(this.endpoints);
return ep;
}

async stream(s){
log.silly('L7ProtocolCluster.stream:', `protocol: ${this.protocol}:`, `Session: ${s.name}`);
return this.transport.stream(s).then( async (x) => {
let endpoint = x.endpoint;
let stream = x.stream;
if(!(stream.readableObjectMode || stream.writableObjectMode))
log.warn('L7ProtocolCluster.stream:', `${this.name}:`,
`Transport is not message-based, ${this.protocol} headers may be fragmented`);

if(typeof s.metadata[this.protocol] === 'undefined'){
let err = `No metadata for protocol ${this.protocol}, cannot generate L7 protocol message`;
log.warn('L7ProtocolCluster:', `protocol: ${this.protocol}:`, `${this.name}:`, err);
stream.destroy();
return Promise.reject(new GeneralError(err));
}

try {
var msg = this.l7_proto.encode(s.metadata[this.protocol], 'queryMessage');
} catch(e){
dump(e, 10);
let err = `Failed to encode metadata for protocol ${this.protocol}: ` + e.message;
log.warn('L7ProtocolCluster:', `protocol: ${this.protocol}:`, `${this.name}:`, err);
stream.destroy();
return Promise.reject(new GeneralError(err));
}

stream.write(msg);

try {
var data = await pEvent(stream, 'data', {
rejectionEvents: ['close', 'error'],
multiArgs: false, timeout: s.route.retry.timeout,
});
} catch(e){
let err = `Failed to receive ${this.protocol} response in `+
`${s.route.retry.timeout} msecs: ` + e.message;
log.warn('L7ProtocolCluster:', `protocol: ${this.protocol}:`, `${this.name}:`, err);
stream.destroy();
return Promise.reject(new GeneralError(err));
}

log.silly('L7ProtocolCluster:', `${this.name}:`,
`Received ${this.protocol} message of length ${data.length} bytes`);

// setImmediate(() => stream.end(data));
setImmediate(() => {
stream.emit('data', data);
stream.destroy();
});

return { stream: stream, endpoint: endpoint};
});
}
};

// non-transport

class L7mpControllerCluster extends Cluster {
Expand Down Expand Up @@ -1068,6 +1182,7 @@ Cluster.create = (c) => {
case 'TCP': return new NetSocketCluster(c);
case 'UnixDomainSocket': return new NetSocketCluster(c);
case 'JSONSocket': return new JSONSocketCluster(c);
case 'DNS': return new L7ProtocolCluster(c);
case 'Stdio': return new StdioCluster(c);
case 'Echo': return new EchoCluster(c);
case 'Discard': return new DiscardCluster(c);
Expand Down
105 changes: 103 additions & 2 deletions 105 listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const pEvent = require('p-event');

const StreamCounter = require('./stream-counter.js').StreamCounter;
const utils = require('./stream.js');
const DNSProtocol = require('./protocols/dns.js').DNSProtocol;

const {L7mpError, Ok, InternalError, BadRequestError, NotFoundError, GeneralError} = require('./error.js');

// for get/setAtPath()
Expand Down Expand Up @@ -463,7 +465,7 @@ class UDPListener extends Listener {
}

this.reconnect = this.reconnectSingleton.bind(this);

log.verbose(`UDPListener.runSingleton: "${this.name}"/${this.mode}`,
`connected to remote`,
`${connection.remote_address}:`+
Expand All @@ -489,7 +491,7 @@ class UDPListener extends Listener {
this.onRequest = onconn;
return stream;
}

// SERVER
async runServer(){
log.silly('UDPListener:runServer', `"${this.name}"`);
Expand Down Expand Up @@ -912,6 +914,104 @@ class JSONSocketListener extends Listener {
// Do not reemit header!: setImmediate(() => { l.stream.emit("data", chunk); });
};

class L7ProtocolListener extends Listener {
constructor(l){
super(l);

// create transport
let li = {
name: this.name + '-transport-listener',
spec: l.spec.transport_spec || l.spec.transport,
};
if(!li.spec){
let e = 'L7ProtocolListener: No transport specified';
log.warning(e);
throw new Error(e);
}
this.transport = Listener.create(li);
this.type = this.transport.type;

switch(this.protocol){
case 'DNS':
this.l7_proto = new DNSProtocol;
break;
default:
log.info('L7ProtocolListener:', `protocol: ${this.protocol}:`, `${this.name}:`,
`Unknown protocol ${this.protocol}`);
return null; // will break soon
}

log.info('L7ProtocolListener:', `protocol: ${this.protocol}: ${this.name} initialized,`,
`using transport:`, dumper(this.transport.spec, 2));
}

async run(){
log.info('L7ProtocolListener.run');
// this.transport.on('emit', this.onSession.bind(this));
this.transport.emitter = this.onSession.bind(this);
try {
await this.transport.run();
} catch(err){
log.info(`L7ProtocolListener.run: Cannot add transport: ${err.message}`);
throw err;
}
// eventDebug(socket);
}

close(){
log.info('L7ProtocolListener.close');
this.transport.close();
}

// WARNING: we may lose the JSON header here when the transport (e.g., UDP) re-emits the packet
// using setImmediate();
async onSession(s){
log.silly('L7ProtocolListener:', `protocol: ${this.protocol}:`,
`New connection request: "${s.metadata.name}"`);
let m = s.metadata;
let l = s.source;

// 2. wait for first packet: stream should be in object mode so we should be able to read
// the entire "JSON header" in one shot
let data = await pEvent(l.stream, 'data', {
rejectionEvents: ['close', 'error'],
multiArgs: false, // TODO: support TIMEOUT: timeout: this.timeout
});

log.silly('L7ProtocolListener:', `protocol: ${this.protocol}:`, `${this.name}:`,
`Received packet, checking for ${this.protocol} header`);

// warn if we are not in object mode
if(typeof data !== 'object' ||
!(l.stream.readableObjectMode || l.stream.writableObjectMode))
log.warn('L7ProtocolListener:', `protocol: ${this.protocol}:`, `${this.name}:`,
`Transport is not message-based, ${this.protocol} header may be fragmented`);

let l7_header = {};
switch(this.protocol){
case 'DNS':
try {
l7_header = this.l7_proto.decode(data, 'queryMessage').val;
} catch(err){
log.info('L7ProtocolListener:', `protocol: ${this.protocol}:`, `${this.name}:`,
`Invalid ${this.protocol} header:`, dumper(err,3));
l.stream.end();
return;
}
break;
}

m[this.protocol] = l7_header;
let session = await this.emitSession(m, l.stream, {res: l.stream, error: this.error});

return session;
}

error(priv, err){
priv.res.write('error');
}
};

Listener.create = (l) => {
log.silly('Listener.create', dumper(l, 8));
let protocol = l.spec.protocol;
Expand All @@ -922,6 +1022,7 @@ Listener.create = (l) => {
case 'TCP': return new NetServerListener(l);
case 'UnixDomainSocket': return new NetServerListener(l);
case 'JSONSocket': return new JSONSocketListener(l);
case 'DNS': return new L7ProtocolListener(l);
case 'Test': return new TestListener(l);
default:
let err = 'Listener.create:'+ `Unknown protocol: "${protocol}"`;
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.