Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

LeoPlatform/Nodejs

Open more actions menu

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

706 Commits
706 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

A Node/Typescript SDK that pushes data to and pulls data from an instance of an RStreams bus.

Full documentation and getting started guide here: rstreams-site-url/rstreams-node-sdk/

For this SDK API documentation, get started with the RStreamsSdk.

Put

Send events one at a time to an instance of the RStreams bus.

Put Operation documentation

import { ConfigurationResources, RStreamsSdk } from "leo-sdk";
import { PersonRaw, PersonRawResults } from "../lib/types";
import axios from "axios";

async function main() {
  const rsdk: RStreamsSdk  = new RStreamsSdk();
  const person = await getRandomPerson();
  await rsdk.putEvent('rstreams-example.load-people', 'rstreams-example.people', person);
}

async function getRandomPerson(): Promise<PersonRaw> {
  const NUM_EVENTS = 1;
  const url = `https://randomuser.me/api/?results=${NUM_EVENTS}&exc=login,registered,phone,cell,picture,id&noinfo`;
  const {data, status} = await axios.get<PersonRawResults>(url);
  
  if (status !== 200) {
    throw new Error('Unable to get randomPeople from https://randomuser.me API: ' + status);
  }
  
  console.log('Person: ' + data.results[0].name.first + ' ' + data.results[0].name.last);

  return data.results[0];
}

(async () => {
  await main();
})()

Enrich

Read events from one queue, modify them and send them on to another queue of the RStreams bus.

Enrich Operation documentation

import { EnrichOptions,  RStreamsSdk } from "leo-sdk";
import { Person, PersonRaw } from "../lib/types";
import axios from "axios";

async function main() {
  const rsdk: RStreamsSdk  = new RStreamsSdk();
  const opts: EnrichOptions<PersonRaw, Person>  = {
    id: 'rstreams-example.people-to-peopleplus',
    inQueue: 'rstreams-example.people',
    outQueue: 'rstreams-example.peopleplus',
    start: 'z/2022/04/20',
    config: {
      limit: 2
    },
    transform: async (person: PersonRaw) => {
      const p: Person = translate(person);
      await addCountryCode(p);
      return p;
    }
  };

  await rsdk.enrichEvents<PersonRaw, Person>(opts);
}

interface CountryCode {cca2: string;}

/**
 * @param person The person to add addr.countryCode to by calling a public API to
 *               turn a country name in a 2 digit country code (iso cca2)
 */
 async function addCountryCode(person: Person): Promise<void> {
    const url = `https://restcountries.com/v3.1/name/${person.addr.country}?fullText=true&fields=cca2`;    
    const cc: CountryCode = await axios.get(url);
    person.addr.countryCode = cc.cca2;
  }

/**
 * @param p The type from the public API we want to modify
 * @returns The new type that is flatter and gets rid of some attributes don't need
 */
function translate(p: PersonRaw): Person {
  return {
    gender: p.gender,
    firstName: p.name.first,
    lastName: p.name.last,
    email: p.email,
    birthDate: p.dob.date,
    nationality: p.nat,
    addr: {
      addr1: p.location.street.number + ' ' + p.location.street.name,
      city: p.location.city, 
      state: p.location.state,
      country: p.location.country,
      postcode: p.location.postcode,
      longitude: p.location.coordinates.longitude,
      latitude: p.location.coordinates.latitude,
      tzOffset: p.location.timezone.offset,
      tzDesc: p.location.timezone.description
    }
  }
}

(async () => {
  await main();
})()

Offload

Stream data down from an RStreams bus queue at scale, optionally modify it and send it to another source such as a database or file.

Offload Operation documentation

import { OffloadOptions,  RStreamsSdk } from "leo-sdk";
import { Person } from "../lib/types";
import axios, { AxiosResponse } from "axios";

async function main() {
  const rsdk: RStreamsSdk  = new RStreamsSdk();
  const opts: OffloadOptions<Person>  = {
    id: 'rstreams-example.offload-one-peopleplus',
    inQueue: 'rstreams-example.people',
    start: 'z/2022/04/20',
    limit: 2,
    transform: async (person: Person) => {
        await savePerson(person);
        return true;        
    }
  };

  await rsdk.offloadEvents<Person>(opts);
}

interface PostResponse {
    success: boolean;
}

/**
 * @param person Save the person to another system.
 */
async function savePerson(person: Person): Promise<void> {
  const url = `https://run.mocky.io/v3/83997150-ab13-43da-9fb9-66051ba06c10?mocky-delay=500ms`;    
  const {data, status}: AxiosResponse<PostResponse, any> = await axios.post<PostResponse>(url, person);
  if (status !== 200 || !data || data.success !== true) {
    throw new Error('Saving person to external system failed');
  }
}

(async () => {
  await main();
})()

About

Nodejs SDK for the Leo Innovation Platform

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Morty Proxy This is a proxified and sanitized view of the page, visit original site.