diff --git a/.buildscripts/e2e.sh b/.buildscripts/e2e.sh new file mode 100755 index 00000000..01b6f92a --- /dev/null +++ b/.buildscripts/e2e.sh @@ -0,0 +1,14 @@ +#!/bin/sh + +set -e + +if [ "${RUN_E2E_TESTS}" != "true" ]; then + echo "Skipping end to end tests." +else + echo "Running end to end tests..." + wget https://github.com/segmentio/library-e2e-tester/releases/download/0.2.2/tester_linux_amd64 + chmod +x tester_linux_amd64 + chmod +x e2e_test.sh + ./tester_linux_amd64 -segment-write-key="${SEGMENT_WRITE_KEY}" -webhook-auth-username="${WEBHOOK_AUTH_USERNAME}" -webhook-bucket="${WEBHOOK_BUCKET}" -path='./e2e_test.sh' + echo "End to end tests completed!" +fi diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 00000000..7ef997ab --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,9 @@ +version: 2 +updates: +- package-ecosystem: pip + directory: "/" + schedule: + interval: daily + open-pull-requests-limit: 10 + reviewers: + - heitorsampaio diff --git a/.github/workflows/create_jira.yml b/.github/workflows/create_jira.yml new file mode 100644 index 00000000..8180ac0f --- /dev/null +++ b/.github/workflows/create_jira.yml @@ -0,0 +1,39 @@ +name: Create Jira Ticket + +on: + issues: + types: + - opened + +jobs: + create_jira: + name: Create Jira Ticket + runs-on: ubuntu-latest + environment: IssueTracker + steps: + - name: Checkout + uses: actions/checkout@master + - name: Login + uses: atlassian/gajira-login@master + env: + JIRA_BASE_URL: ${{ secrets.JIRA_BASE_URL }} + JIRA_USER_EMAIL: ${{ secrets.JIRA_USER_EMAIL }} + JIRA_API_TOKEN: ${{ secrets.JIRA_TOKEN }} + JIRA_EPIC_KEY: ${{ secrets.JIRA_EPIC_KEY }} + JIRA_PROJECT: ${{ secrets.JIRA_PROJECT }} + + - name: Create + id: create + uses: atlassian/gajira-create@master + with: + project: ${{ secrets.JIRA_PROJECT }} + issuetype: Bug + summary: | + [${{ github.event.repository.name }}] (${{ github.event.issue.number }}): ${{ github.event.issue.title }} + description: | + Github Link: ${{ github.event.issue.html_url }} + ${{ github.event.issue.body }} + fields: '{"parent": {"key": "${{ secrets.JIRA_EPIC_KEY }}"}}' + + - name: Log created issue + run: echo "Issue ${{ steps.create.outputs.issue }} was created" \ No newline at end of file diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 00000000..65d6119b --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,57 @@ +name: Run Python Tests +on: + push: + branches: + - master + pull_request: + branches: + - master +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Install Python 3 + uses: actions/setup-python@v3 + with: + python-version: 3.9 + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install python-dateutil backoff monotonic + pip install --user . + sudo pip install pylint==3.3.1 flake8 mock==3.0.5 python-dateutil aiohttp==3.9.1 + - name: Run tests + run: python -m unittest discover -s segment + +# snyk: +# runs-on: ubuntu-latest +# steps: +# - uses: actions/checkout@v3 +# #- attach_workspace: { at: . } +# - run: pip3 install pipreqs +# - run: pip3 install --user appdirs +# - run: pipreqs . +# - run: pip3 install --user -r requirements.txt +# - run: curl -sL https://raw.githubusercontent.com/segmentio/snyk_helpers/master/initialization/snyk.sh | sh# + +# test: +# #needs: ['coding-standard', 'lint'] +# runs-on: ubuntu-latest +# strategy: +# matrix: +# python: ['3.9', '3.10', '3.11'] +# coverage: [false] +# experimental: [false] +# include: +# # Run code coverage. +# - python: '3.9' +# coverage: true +# experimental: false +# - python: '3.10' +# coverage: true +# experimental: false +# - python: '3.11' +# coverage: true +# experimental: false \ No newline at end of file diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 00000000..97b401c2 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,47 @@ +name: analytics test suite + +on: + push: + branches: + - master + - '**Tests**' + paths-ignore: + - '**.md' + pull_request: + paths-ignore: + - '**.md' + +jobs: + test-setup-python: + name: Test setup-python + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Run with setup-python 3.9 + uses: actions/setup-python@v5 + with: + python-version: '3.9' + - name: Setup required modules + run: python -m pip install -r requirements.txt + - name: Run tests + run: python -m unittest discover -s segment + + - name: Run with setup-python 3.10 + uses: actions/setup-python@v5 + with: + python-version: '3.10' + - name: Setup required modules + run: python -m pip install -r requirements.txt + - name: Run tests + run: python -m unittest discover -s segment + + - name: Run with setup-python 3.11 + uses: actions/setup-python@v5 + with: + python-version: '3.11' + - name: Setup required modules + run: python -m pip install -r requirements.txt + - name: Run tests + run: python -m unittest discover -s segment \ No newline at end of file diff --git a/.gitignore b/.gitignore index 7943ec53..b856542a 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,9 @@ dist *.egg-info dist MANIFEST -build \ No newline at end of file +build +.eggs +*.bat +.vscode/ +.idea/ +.python-version diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 00000000..4712a015 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,548 @@ +[MAIN] + +# Python code to execute, usually for sys.path manipulation such as +# pygtk.require(). +#init-hook= + +# Files or directories to be skipped. They should be base names, not +# paths. +ignore=CVS + +# Add files or directories matching the regex patterns to the ignore-list. The +# regex matches against paths and can be in Posix or Windows format. +ignore-paths= + +# Files or directories matching the regex patterns are skipped. The regex +# matches against base names, not paths. +ignore-patterns=^\.# + +# Pickle collected data for later comparisons. +persistent=yes + +# List of plugins (as comma separated values of python modules names) to load, +# usually to register additional checkers. +load-plugins= + pylint.extensions.check_elif, + pylint.extensions.bad_builtin, + pylint.extensions.docparams, + pylint.extensions.for_any_all, + pylint.extensions.set_membership, + pylint.extensions.code_style, + pylint.extensions.overlapping_exceptions, + pylint.extensions.typing, + pylint.extensions.redefined_variable_type, + pylint.extensions.comparison_placement, + pylint.extensions.broad_try_clause, + pylint.extensions.dict_init_mutate, + pylint.extensions.consider_refactoring_into_while_condition, + +# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the +# number of processors available to use. +jobs=1 + +# When enabled, pylint would attempt to guess common misconfiguration and emit +# user-friendly hints instead of false-positive error messages. +suggestion-mode=yes + +# Allow loading of arbitrary C extensions. Extensions are imported into the +# active Python interpreter and may run arbitrary code. +unsafe-load-any-extension=no + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code +extension-pkg-allow-list= + +# Minimum supported python version +py-version = 3.8.0 + +# Control the amount of potential inferred values when inferring a single +# object. This can help the performance when dealing with large functions or +# complex, nested conditions. +limit-inference-results=100 + +# Specify a score threshold under which the program will exit with error. +fail-under=10.0 + +# Return non-zero exit code if any of these messages/categories are detected, +# even if score is above --fail-under value. Syntax same as enable. Messages +# specified are enabled, while categories only check already-enabled messages. +fail-on= + +# Clear in-memory caches upon conclusion of linting. Useful if running pylint in +# a server-like mode. +clear-cache-post-run=no + + +[MESSAGES CONTROL] + +# Only show warnings with the listed confidence levels. Leave empty to show +# all. Valid levels: HIGH, INFERENCE, INFERENCE_FAILURE, UNDEFINED +# confidence= + +# Enable the message, report, category or checker with the given id(s). You can +# either give multiple identifier separated by comma (,) or put this option +# multiple time (only on the command line, not in the configuration file where +# it should appear only once). See also the "--disable" option for examples. +enable= + use-symbolic-message-instead, + useless-suppression, + +# Disable the message, report, category or checker with the given id(s). You +# can either give multiple identifiers separated by comma (,) or put this +# option multiple times (only on the command line, not in the configuration +# file where it should appear only once).You can also use "--disable=all" to +# disable everything first and then re-enable specific checks. For example, if +# you want to run only the similarities checker, you can use "--disable=all +# --enable=similarities". If you want to run only the classes checker, but have +# no Warning level messages displayed, use"--disable=all --enable=classes +# --disable=W" + +disable= + attribute-defined-outside-init, + invalid-name, + missing-docstring, + protected-access, + too-few-public-methods, + # handled by black + format, + # We anticipate #3512 where it will become optional + fixme, + consider-using-assignment-expr, + + +[REPORTS] + +# Set the output format. Available formats are text, parseable, colorized, msvs +# (visual studio) and html. You can also give a reporter class, eg +# mypackage.mymodule.MyReporterClass. +output-format=text + +# Tells whether to display a full report or only the messages +reports=no + +# Python expression which should return a note less than 10 (10 is the highest +# note). You have access to the variables 'fatal', 'error', 'warning', 'refactor', 'convention' +# and 'info', which contain the number of messages in each category, as +# well as 'statement', which is the total number of statements analyzed. This +# score is used by the global evaluation report (RP0004). +evaluation=max(0, 0 if fatal else 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)) + +# Template used to display messages. This is a python new-style format string +# used to format the message information. See doc for all details +#msg-template= + +# Activate the evaluation score. +score=yes + + +[LOGGING] + +# Logging modules to check that the string format arguments are in logging +# function parameter format +logging-modules=logging + +# The type of string formatting that logging methods do. `old` means using % +# formatting, `new` is for `{}` formatting. +logging-format-style=old + + +[MISCELLANEOUS] + +# List of note tags to take in consideration, separated by a comma. +notes=FIXME,XXX,TODO + +# Regular expression of note tags to take in consideration. +#notes-rgx= + + +[SIMILARITIES] + +# Minimum lines number of a similarity. +min-similarity-lines=6 + +# Ignore comments when computing similarities. +ignore-comments=yes + +# Ignore docstrings when computing similarities. +ignore-docstrings=yes + +# Ignore imports when computing similarities. +ignore-imports=yes + +# Signatures are removed from the similarity computation +ignore-signatures=yes + + +[VARIABLES] + +# Tells whether we should check for unused import in __init__ files. +init-import=no + +# List of additional names supposed to be defined in builtins. Remember that +# you should avoid defining new builtins when possible. +additional-builtins= + +# List of strings which can identify a callback function by name. A callback +# name must start or end with one of those strings. +callbacks=cb_,_cb + +# Tells whether unused global variables should be treated as a violation. +allow-global-unused-variables=yes + +# List of names allowed to shadow builtins +allowed-redefined-builtins= + +# List of qualified module names which can have objects that can redefine +# builtins. +redefining-builtins-modules=six.moves,past.builtins,future.builtins,builtins,io + + +[FORMAT] + +# Maximum number of characters on a single line. +max-line-length=100 + +# Regexp for a line that is allowed to be longer than the limit. +ignore-long-lines=^\s*(# )??$ + +# Allow the body of an if to be on the same line as the test if there is no +# else. +single-line-if-stmt=no + +# Allow the body of a class to be on the same line as the declaration if body +# contains single statement. +single-line-class-stmt=no + +# Maximum number of lines in a module +max-module-lines=2000 + +# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1 +# tab). +indent-string=' ' + +# Number of spaces of indent required inside a hanging or continued line. +indent-after-paren=4 + +# Expected format of line ending, e.g. empty (any line ending), LF or CRLF. +expected-line-ending-format= + + +[BASIC] + +# Good variable names which should always be accepted, separated by a comma +good-names=i,j,k,ex,Run,_ + +# Good variable names regexes, separated by a comma. If names match any regex, +# they will always be accepted +good-names-rgxs= + +# Bad variable names which should always be refused, separated by a comma +bad-names=foo,bar,baz,toto,tutu,tata + +# Bad variable names regexes, separated by a comma. If names match any regex, +# they will always be refused +bad-names-rgxs= + +# Colon-delimited sets of names that determine each other's naming style when +# the name regexes allow several styles. +name-group= + +# Include a hint for the correct naming format with invalid-name +include-naming-hint=no + +# Naming style matching correct function names. +function-naming-style=snake_case + +# Regular expression matching correct function names +function-rgx=[a-z_][a-z0-9_]{2,30}$ + +# Naming style matching correct variable names. +variable-naming-style=snake_case + +# Regular expression matching correct variable names +variable-rgx=[a-z_][a-z0-9_]{2,30}$ + +# Naming style matching correct constant names. +const-naming-style=UPPER_CASE + +# Regular expression matching correct constant names +const-rgx=(([A-Z_][A-Z0-9_]*)|(__.*__))$ + +# Naming style matching correct attribute names. +attr-naming-style=snake_case + +# Regular expression matching correct attribute names +attr-rgx=[a-z_][a-z0-9_]{2,}$ + +# Naming style matching correct argument names. +argument-naming-style=snake_case + +# Regular expression matching correct argument names +argument-rgx=[a-z_][a-z0-9_]{2,30}$ + +# Naming style matching correct class attribute names. +class-attribute-naming-style=any + +# Regular expression matching correct class attribute names +class-attribute-rgx=([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$ + +# Naming style matching correct class constant names. +class-const-naming-style=UPPER_CASE + +# Regular expression matching correct class constant names. Overrides class- +# const-naming-style. +#class-const-rgx= + +# Naming style matching correct inline iteration names. +inlinevar-naming-style=any + +# Regular expression matching correct inline iteration names +inlinevar-rgx=[A-Za-z_][A-Za-z0-9_]*$ + +# Naming style matching correct class names. +class-naming-style=PascalCase + +# Regular expression matching correct class names +class-rgx=[A-Z_][a-zA-Z0-9]+$ + + +# Naming style matching correct module names. +module-naming-style=snake_case + +# Regular expression matching correct module names +module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$ + + +# Naming style matching correct method names. +method-naming-style=snake_case + +# Regular expression matching correct method names +method-rgx=[a-z_][a-z0-9_]{2,}$ + +# Regular expression matching correct type variable names +#typevar-rgx= + +# Regular expression which should only match function or class names that do +# not require a docstring. Use ^(?!__init__$)_ to also check __init__. +no-docstring-rgx=__.*__ + +# Minimum line length for functions/classes that require docstrings, shorter +# ones are exempt. +docstring-min-length=-1 + +# List of decorators that define properties, such as abc.abstractproperty. +property-classes=abc.abstractproperty + + +[TYPECHECK] + +# Regex pattern to define which classes are considered mixins if ignore-mixin- +# members is set to 'yes' +mixin-class-rgx=.*MixIn + +# List of module names for which member attributes should not be checked +# (useful for modules/projects where namespaces are manipulated during runtime +# and thus existing member attributes cannot be deduced by static analysis). It +# supports qualified module names, as well as Unix pattern matching. +ignored-modules= + +# List of class names for which member attributes should not be checked (useful +# for classes with dynamically set attributes). This supports the use of +# qualified names. +ignored-classes=SQLObject, optparse.Values, thread._local, _thread._local + +# List of members which are set dynamically and missed by pylint inference +# system, and so shouldn't trigger E1101 when accessed. Python regular +# expressions are accepted. +generated-members=REQUEST,acl_users,aq_parent,argparse.Namespace + +# List of decorators that create context managers from functions, such as +# contextlib.contextmanager. +contextmanager-decorators=contextlib.contextmanager + +# Tells whether to warn about missing members when the owner of the attribute +# is inferred to be None. +ignore-none=yes + +# This flag controls whether pylint should warn about no-member and similar +# checks whenever an opaque object is returned when inferring. The inference +# can return multiple potential results while evaluating a Python object, but +# some branches might not be evaluated, which results in partial inference. In +# that case, it might be useful to still emit no-member and other checks for +# the rest of the inferred objects. +ignore-on-opaque-inference=yes + +# Show a hint with possible names when a member name was not found. The aspect +# of finding the hint is based on edit distance. +missing-member-hint=yes + +# The minimum edit distance a name should have in order to be considered a +# similar match for a missing member name. +missing-member-hint-distance=1 + +# The total number of similar names that should be taken in consideration when +# showing a hint for a missing member. +missing-member-max-choices=1 + +[SPELLING] + +# Spelling dictionary name. Available dictionaries: none. To make it working +# install python-enchant package. +spelling-dict= + +# List of comma separated words that should not be checked. +spelling-ignore-words= + +# List of comma separated words that should be considered directives if they +# appear and the beginning of a comment and should not be checked. +spelling-ignore-comment-directives=fmt: on,fmt: off,noqa:,noqa,nosec,isort:skip,mypy:,pragma:,# noinspection + +# A path to a file that contains private dictionary; one word per line. +spelling-private-dict-file=.pyenchant_pylint_custom_dict.txt + +# Tells whether to store unknown words to indicated private dictionary in +# --spelling-private-dict-file option instead of raising a message. +spelling-store-unknown-words=no + +# Limits count of emitted suggestions for spelling mistakes. +max-spelling-suggestions=2 + + +[DESIGN] + +# Maximum number of arguments for function / method +max-args = 9 + +# Maximum number of locals for function / method body +max-locals = 19 + +# Maximum number of return / yield for function / method body +max-returns=11 + +# Maximum number of branch for function / method body +max-branches = 20 + +# Maximum number of statements in function / method body +max-statements = 50 + +# Maximum number of attributes for a class (see R0902). +max-attributes=11 + +# Maximum number of statements in a try-block +max-try-statements = 7 + +[CLASSES] + +# List of method names used to declare (i.e. assign) instance attributes. +defining-attr-methods=__init__,__new__,setUp,__post_init__ + +# List of valid names for the first argument in a class method. +valid-classmethod-first-arg=cls + +# List of valid names for the first argument in a metaclass class method. +valid-metaclass-classmethod-first-arg=mcs + +# List of member names, which should be excluded from the protected access +# warning. +exclude-protected=_asdict,_fields,_replace,_source,_make + +# Warn about protected attribute access inside special methods +check-protected-access-in-special-methods=no + +[IMPORTS] + +# List of modules that can be imported at any level, not just the top level +# one. +allow-any-import-level= + +# Allow wildcard imports from modules that define __all__. +allow-wildcard-with-all=no + +# Allow explicit reexports by alias from a package __init__. +allow-reexport-from-package=no + +# Analyse import fallback blocks. This can be used to support both Python 2 and +# 3 compatible code, which means that the block might have code that exists +# only in one or another interpreter, leading to false positives when analysed. +analyse-fallback-blocks=no + +# Deprecated modules which should not be used, separated by a comma +deprecated-modules=regsub,TERMIOS,Bastion,rexec + +# Create a graph of every (i.e. internal and external) dependencies in the +# given file (report RP0402 must not be disabled) +import-graph= + +# Create a graph of external dependencies in the given file (report RP0402 must +# not be disabled) +ext-import-graph= + +# Create a graph of internal dependencies in the given file (report RP0402 must +# not be disabled) +int-import-graph= + +# Force import order to recognize a module as part of the standard +# compatibility libraries. +known-standard-library= + +# Force import order to recognize a module as part of a third party library. +known-third-party=enchant + +# Couples of modules and preferred modules, separated by a comma. +preferred-modules= + + +[EXCEPTIONS] + +# Exceptions that will emit a warning when being caught. Defaults to +# "Exception" +overgeneral-exceptions=builtins.Exception + + +[TYPING] + +# Set to ``no`` if the app / library does **NOT** need to support runtime +# introspection of type annotations. If you use type annotations +# **exclusively** for type checking of an application, you're probably fine. +# For libraries, evaluate if some users what to access the type hints at +# runtime first, e.g., through ``typing.get_type_hints``. Applies to Python +# versions 3.7 - 3.9 +runtime-typing = no + + +[DEPRECATED_BUILTINS] + +# List of builtins function names that should not be used, separated by a comma +bad-functions=map,input + + +[REFACTORING] + +# Maximum number of nested blocks for function / method body +max-nested-blocks=5 + +# Complete name of functions that never returns. When checking for +# inconsistent-return-statements if a never returning function is called then +# it will be considered as an explicit return statement and no message will be +# printed. +never-returning-functions=sys.exit,argparse.parse_error + + +[STRING] + +# This flag controls whether inconsistent-quotes generates a warning when the +# character used as a quote delimiter is used inconsistently within a module. +check-quote-consistency=no + +# This flag controls whether the implicit-str-concat should generate a warning +# on implicit string concatenation in sequences defined over several lines. +check-str-concat-over-line-jumps=no + + +[CODE_STYLE] + +# Max line length for which to sill emit suggestions. Used to prevent optional +# suggestions which would get split by a code formatter (e.g., black). Will +# default to the setting for ``max-line-length``. +#max-line-length-suggestions= \ No newline at end of file diff --git a/HISTORY.md b/HISTORY.md new file mode 100644 index 00000000..613a38c3 --- /dev/null +++ b/HISTORY.md @@ -0,0 +1,184 @@ +# 2.3.5 / 2025-11-18 +- Fix for Github Issue #516 +- Fix for Github Issue #515 +- Fix for Github Issue #493 +- Updated the version to 2.3.5 +- Enhanced retry logic in segment/analytics/consumer.py by adding detailed debug logs for each retry attempt and logging an error when all retries are exhausted. +- Moved the return success out of the finally block in segment/analytics/consumer.py at Line 84. +- Removed redundant error logging in segment/analytics/request.py to avoid duplicate logs when exceptions are raised. + +# 2.3.4 / 2025-2-24 +- Fix for proxy values not being used + +# 2.3.3 / 2024-10-07 +- Update time handling and OAuth + +# 2.3.2 / 2024-02-15 +- Updating version to create a release wheel without the outdated /analytics files + +# 2.3.1 / 2024-02-07 +- Fixing dependency for JWT + +# 2.3.0 / 2024-01-29 +- OAuth 2.0 support +- Adding Python 3.10 and 3.11 classifiers by @mvinogradov-wavefin +- Update from monotonic to time module by @rudyardrichter +- Correct HISTORY header for most recent release by @DavidCain + +# 2.2.3 / 2023-06-12 +- Update project to use GitHub Actions +- Support for Python 3.10 and 3.11 +- Return values for function calls via the proxy + +# 2.2.2 / 2022-11-29 +- Specifying milliseconds as the isoformat rather than the default microseconds in timestamp + +# 2.2.1 / 2022-06-23 +- Empty Catch fix #217 +- Build Isolation fix #216 +- Removing remaining string_type references + +# 2.2.0 / 2022-03-07 +- Remove Python 2 support +- Remove six package + +# 2.1.0 / 2022-03-04 + +- Raise exception on large message +- Automatically coerce Enum values inside messages +- Handle exceptions in the try catch and log them + + +# 2.0.0 / 2021-10-01 + +- Update package name and namespace name + + +# 1.5.0 / 2021-09-23 +- Update tests with latest dependencies +- Remove unsupported python versions 2.7 & 3.5 + +# 1.4.0 / 2021-07-16 +- Fix the missing `upload_size` parameter + +# 1.3.1 / 2021-05-12 + +- Fix linting code and readme heling basic things. +- Add support for HTTP proxy +- Allows more settings to be configured from singleton + +# 1.3.0-beta1 / 2019-04-27 + +- Add `sync_mode` option ([#147](https://github.com/segmentio/analytics-python/pull/147)) + +# 1.3.0-beta0 / 2018-10-10 + +- Add User-Agent header to messages +- Don't retry sending on client errors except 429 +- Allow user-defined upload interval +- Add `shutdown` function +- Add gzip support +- Add exponential backoff with jitter when retrying +- Add a parameter in Client to configure max retries +- Limit batch upload size to 500KB +- Drop messages greater than 32kb +- Allow user-defined upload size +- Support custom messageId + +# 1.2.9 / 2017-11-28 + +- [Fix](https://github.com/segmentio/analytics-python/pull/102): Stringify non-string userIds and anonymousIds. + +# 1.2.8 / 2017-09-20 + +- [Fix](https://github.com/segmentio/analytics-python/issues/94): Date objects are removed from event properties. +- [Fix](https://github.com/segmentio/analytics-python/pull/98): Fix for regression introduced in version 1.2.4. + +# 1.2.7 / 2017-01-31 + +- [Fix](https://github.com/segmentio/analytics-python/pull/92): Correctly serialize date objects. + +# 1.2.6 / 2016-12-07 + +- don't add messages to the queue if send is false +- drop py32 support + +# 1.2.5 / 2016-07-02 + +- Fix outdated python-dateutil<2 requirement for python2 - dateutil > 2.1 runs is python2 compatible +- Fix a bug introduced in 1.2.4 where we could try to join a thread that was not yet started + +# 1.2.4 / 2016-06-06 + +- Fix race conditions in overflow and flush tests +- Join daemon thread on interpreter exit to prevent value errors +- Capitalize HISTORY.md (#76) +- Quick fix for Decimal to send as a float + +# 1.2.3 / 2016-03-23 + +- relaxing requests dep + +# 1.2.2 / 2016-03-17 + +- Fix environment markers definition +- Use proper way for defining conditional dependencies + +# 1.2.1 / 2016-03-11 + +- fixing requirements.txt + +# 1.2.0 / 2016-03-11 + +- adding versioned requirements.txt file + +# 1.1.0 / 2015-06-23 + +- Adding fixes for handling invalid json types +- Fixing byte/bytearray handling +- Adding `logging.DEBUG` fix for `setLevel` +- Support HTTP keep-alive using a Session connection pool +- Suppport universal wheels +- adding .sentAt +- make it really testable +- fixing overflow test +- removing .io's +- Update README.md +- spacing + +# 1.0.3 / 2014-09-30 + +- adding top level send option + +# 1.0.2 / 2014-09-17 + +- fixing debug logging levels + +# 1.0.1 / 2014-09-08 + +- fixing Unicode handling, for write_key and events +- adding six to requirements.txt and install scripts + +# 1.0.0 / 2014-09-05 + +- updating to spec 1.0 +- adding python3 support +- moving to analytics.write_key API +- moving consumer to a separate thread +- adding request retries +- making analytics.flush() synchronous +- adding full Travis tests + +# 0.4.4 / 2013-11-21 + +- add < python 2.7 compatibility by removing `delta.total_seconds` + +# 0.4.3 / 2013-11-13 + +- added datetime serialization fix (alexlouden) + +# 0.4.2 / 2013-06-26 + +- Added history.d change log +- Merging https://github.com/segmentio/analytics-python/pull/14 to add support for lists and PEP8 fixes. Thanks https://github.com/dfee! +- Fixing #12, adding static public API to analytics.**init** diff --git a/LICENSE b/LICENSE new file mode 100644 index 00000000..c29156af --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 Segment (segment.com) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..cee88eb5 --- /dev/null +++ b/Makefile @@ -0,0 +1,15 @@ +install: + pip install --edit .[test] + +test: + pylint --rcfile=.pylintrc --reports=y --exit-zero analytics | tee pylint.out + flake8 --max-complexity=10 --statistics analytics > flake8.out || true + +release: + python setup.py sdist bdist_wheel + twine upload dist/* + +e2e_test: + .buildscripts/e2e.sh + +.PHONY: test release e2e_test diff --git a/README.md b/README.md index 86824191..fdcd72cf 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,89 @@ analytics-python ============== -analytics-python is a python client for [Segment.io](https://segment.io) +======= +[![Run Python Tests](https://github.com/North-Two-Five/analytics-python/actions/workflows/main.yml/badge.svg)](https://github.com/North-Two-Five/analytics-python/actions/workflows/main.yml) +[![.github/workflows/tests.yml](https://github.com/North-Two-Five/analytics-python/actions/workflows/tests.yml/badge.svg)](https://github.com/North-Two-Five/analytics-python/actions/workflows/tests.yml) +======= + + + +analytics-python is a python client for [Segment](https://segment.com) + +### ⚠️ Maintenance ⚠️ +This library is in maintenance mode. It will send data as intended, but receive no new feature support and only critical maintenance updates from Segment. + +
+ +

You can't fix what you can't measure

+
+ +Analytics helps you measure your users, product, and business. It unlocks insights into your app's funnel, core business metrics, and whether you have a product-market fit. + +## 🚀 How to get started +1. **Collect analytics data** from your app(s). + - The top 200 Segment companies collect data from 5+ source types (web, mobile, server, CRM, etc.). +2. **Send the data to analytics tools** (for example, Google Analytics, Amplitude, Mixpanel). + - Over 250+ Segment companies send data to eight categories of destinations such as analytics tools, warehouses, email marketing, and remarketing systems, session recording, and more. +3. **Explore your data** by creating metrics (for example, new signups, retention cohorts, and revenue generation). + - The best Segment companies use retention cohorts to measure product-market fit. Netflix has 70% paid retention after 12 months, 30% after 7 years. + +[Segment](https://segment.com) collects analytics data and allows you to send it to more than 250 apps (such as Google Analytics, Mixpanel, Optimizely, Facebook Ads, Slack, Sentry) just by flipping a switch. You only need one Segment code snippet, and you can turn integrations on and off at will, with no additional code. [Sign up with Segment today](https://app.segment.com/signup). + +### 🤔 Why? +1. **Power all your analytics apps with the same data**. Instead of writing code to integrate all of your tools individually, send data to Segment, once. + +2. **Install tracking for the last time**. We're the last integration you'll ever need to write. You only need to instrument Segment once. Reduce all of your tracking code and advertising tags into a single set of API calls. + +3. **Send data from anywhere**. Send Segment data from any device, and we'll transform and send it on to any tool. + +4. **Query your data in SQL**. Slice, dice, and analyze your data in detail with Segment SQL. We'll transform and load your customer behavioral data directly from your apps into Amazon Redshift, Google BigQuery, or Postgres. Save weeks of engineering time by not having to invent your data warehouse and ETL pipeline. + + For example, you can capture data on any app: + ```python + analytics.track('Order Completed', { price: 99.84 }) + ``` + Then, query the resulting data in SQL: + ```sql + select * from app.order_completed + order by price desc + ``` + +## 👨‍💻 Getting Started + +Install `segment-analytics-python` using pip: + +```bash +pip3 install segment-analytics-python +``` + +or you can clone this repo: +```bash +git clone https://github.com/segmentio/analytics-python.git + +cd analytics-python + +sudo python3 setup.py install +``` + +Now inside your app, you'll want to **set your** `write_key` before making any analytics calls: + +```python +import segment.analytics as analytics + +analytics.write_key = 'YOUR_WRITE_KEY' +``` +**Note** If you need to send data to multiple Segment sources, you can initialize a new Client for each `write_key` + +### 🚀 Startup Program +
+ +
+If you are part of a new startup (<$5M raised, <2 years since founding), we just launched a new startup program for you. You can get a Segment Team plan (up to $25,000 value in Segment credits) for free up to 2 years — apply here! ## Documentation -Documentation is available at [https://segment.io/libraries/python](https://segment.io/libraries/python). +Documentation is available at [https://segment.com/libraries/python](https://segment.com/libraries/python). ## License @@ -24,10 +102,12 @@ WWWWWW||WWWWWW (The MIT License) -Copyright (c) 2013 Segment.io Inc. +Copyright (c) 2013 Segment Inc. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the 'Software'), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +[![forthebadge](https://forthebadge.com/images/badges/built-with-love.svg)](https://forthebadge.com) diff --git a/RELEASING.md b/RELEASING.md new file mode 100644 index 00000000..e141a4ae --- /dev/null +++ b/RELEASING.md @@ -0,0 +1,9 @@ +Releasing +========= + +1. Update `VERSION` in `segment/analytics/version.py` to the new version. +2. Update the `HISTORY.md` for the impending release. +3. `git commit -am "Release X.Y.Z."` (where X.Y.Z is the new version) +4. `git tag -a X.Y.Z -m "Version X.Y.Z"` (where X.Y.Z is the new version). +5. `git push && git push --tags` +6. `make release`. diff --git a/analytics/__init__.py b/analytics/__init__.py deleted file mode 100644 index 0b99be95..00000000 --- a/analytics/__init__.py +++ /dev/null @@ -1,59 +0,0 @@ - -import version - -VERSION = version.VERSION -__version__ = VERSION - -import sys -this_module = sys.modules[__name__] - -from stats import Statistics -stats = Statistics() -methods = ['identify', 'track', 'alias', 'flush', 'on_success', 'on_failure'] - -def uninitialized(*args, **kwargs): - print >>sys.stderr, 'Please call analytics.init(secret) before calling analytics methods.' - -for method in methods: - setattr(this_module, method, uninitialized) - - -def init(secret, **kwargs): - """Create a default instance of a analytics-python client - - :param str secret: The Segment.io API Secret - - Kwargs: - - :param logging.LOG_LEVEL log_level: The logging log level for the client - talks to. Use log_level=logging.DEBUG to troubleshoot - : param bool log: False to turn off logging completely, True by default - : param int flush_at: Specicies after how many messages the client will flush - to the server. Use flush_at=1 to disable batching - : param datetime.timedelta flush_after: Specifies after how much time - of no flushing that the server will flush. Used in conjunction with - the flush_at size policy - : param bool async: True to have the client flush to the server on another - thread, therefore not blocking code (this is the default). False to - enable blocking and making the request on the calling thread. - - """ - from client import Client - - # if we have already initialized, no-op - if hasattr(this_module, 'default_client'): - return - - default_client = Client(secret=secret, stats=stats, **kwargs) - - setattr(this_module, 'default_client', default_client) - - def proxy(method): - def proxy_to_default_client(*args, **kwargs): - func = getattr(default_client, method) - return func(*args, **kwargs) - - setattr(this_module, method, proxy_to_default_client) - - for method in methods: - proxy(method) diff --git a/analytics/client.py b/analytics/client.py deleted file mode 100644 index 2701f88a..00000000 --- a/analytics/client.py +++ /dev/null @@ -1,493 +0,0 @@ -import collections -from datetime import datetime, timedelta -import json -import logging -import numbers -import threading - -from dateutil.tz import tzutc -import requests - -from stats import Statistics -from errors import ApiError -from utils import guess_timezone - -import options - - -logging_enabled = True -logger = logging.getLogger('analytics') - - -def log(level, *args, **kwargs): - if logging_enabled: - method = getattr(logger, level) - method(*args, **kwargs) - - -def package_exception(client, data, e): - log('warn', 'Segment.io request error', exc_info=True) - client._on_failed_flush(data, e) - - -def package_response(client, data, response): - # TODO: reduce the complexity (mccabe) - if response.status_code == 200: - client._on_successful_flush(data, response) - elif response.status_code == 400: - content = response.text - try: - body = json.loads(content) - - code = 'bad_request' - message = 'Bad request' - - if 'error' in body: - error = body.error - - if 'code' in error: - code = error['code'] - - if 'message' in error: - message = error['message'] - - client._on_failed_flush(data, ApiError(code, message)) - - except Exception: - client._on_failed_flush(data, ApiError('Bad Request', content)) - else: - client._on_failed_flush(data, - ApiError(response.status_code, response.text)) - - -def request(client, url, data): - - log('debug', 'Sending request to Segment.io ...') - try: - - response = requests.post(url, - data=json.dumps(data), - headers={'content-type': 'application/json'}, - timeout=client.timeout) - - log('debug', 'Finished Segment.io request.') - - package_response(client, data, response) - - return response.status_code == 200 - - except requests.ConnectionError as e: - package_exception(client, data, e) - except requests.Timeout as e: - package_exception(client, data, e) - - return False - - -class FlushThread(threading.Thread): - - def __init__(self, client): - threading.Thread.__init__(self) - self.client = client - - def run(self): - log('debug', 'Flushing thread running ...') - - self.client._sync_flush() - - log('debug', 'Flushing thread done.') - - -class Client(object): - """The Client class is a batching asynchronous python wrapper over the - Segment.io API. - - """ - - def __init__(self, secret=None, log_level=logging.INFO, log=True, - flush_at=20, flush_after=timedelta(0, 10), - async=True, max_queue_size=10000, stats=Statistics(), - timeout=10, send=True): - """Create a new instance of a analytics-python Client - - :param str secret: The Segment.io API secret - :param logging.LOG_LEVEL log_level: The logging log level for the - client talks to. Use log_level=logging.DEBUG to troubleshoot - : param bool log: False to turn off logging completely, True by default - : param int flush_at: Specicies after how many messages the client will - flush to the server. Use flush_at=1 to disable batching - : param datetime.timedelta flush_after: Specifies after how much time - of no flushing that the server will flush. Used in conjunction with - the flush_at size policy - : param bool async: True to have the client flush to the server on - another thread, therefore not blocking code (this is the default). - False to enable blocking and making the request on the calling thread. - : param float timeout: Number of seconds before timing out request to - Segment.io - : param bool send: True to send requests, False to not send. False to - turn analytics off (for testing). - """ - - self.secret = secret - - self.queue = collections.deque() - self.last_flushed = None - - if not log: - # TODO: logging_enabled is assigned, but not used - logging_enabled = False - # effectively disables the logger - logger.setLevel(logging.CRITICAL) - else: - logger.setLevel(log_level) - - self.async = async - - self.max_queue_size = max_queue_size - self.max_flush_size = 50 - - self.flush_at = flush_at - self.flush_after = flush_after - - self.timeout = timeout - - self.stats = stats - - self.flush_lock = threading.Lock() - self.flushing_thread = None - - self.send = send - - self.success_callbacks = [] - self.failure_callbacks = [] - - def set_log_level(self, level): - """Sets the log level for analytics-python - - :param logging.LOG_LEVEL level: The level at which analytics-python log - should talk at - """ - logger.setLevel(level) - - def _check_for_secret(self): - if not self.secret: - raise Exception('Please set analytics.secret before calling ' + - 'identify or track.') - - def _coerce_unicode(self, cmplx): - return unicode(cmplx) - - def _clean_list(self, l): - return [self._clean(item) for item in l] - - def _clean_dict(self, d): - data = {} - for k, v in d.iteritems(): - try: - data[k] = self._clean(v) - except TypeError: - log('warn', 'Dictionary values must be serializeable to ' + - 'JSON "%s" value %s of type %s is unsupported.' - % (k, v, type(v))) - return data - - def _clean(self, item): - if isinstance(item, (str, unicode, int, long, float, bool, - numbers.Number, datetime)): - return item - elif isinstance(item, (set, list, tuple)): - return self._clean_list(item) - elif isinstance(item, dict): - return self._clean_dict(item) - else: - return self._coerce_unicode(item) - - def on_success(self, callback): - self.success_callbacks.append(callback) - - def on_failure(self, callback): - self.failure_callbacks.append(callback) - - def identify(self, user_id=None, traits={}, context={}, timestamp=None): - # TODO: reduce the complexity (mccabe) - """Identifying a user ties all of their actions to an id, and - associates user traits to that id. - - :param str user_id: the user's id after they are logged in. It's the - same id as which you would recognize a signed-in user in your system. - - : param dict traits: a dictionary with keys like subscriptionPlan or - age. You only need to record a trait once, no need to send it again. - Accepted value types are string, boolean, ints,, longs, and - datetime.datetime. - - : param dict context: An optional dictionary with additional - information thats related to the visit. Examples are userAgent, and IP - address of the visitor. - - : param datetime.datetime timestamp: If this event happened in the - past, the timestamp can be used to designate when the identification - happened. Careful with this one, if it just happened, leave it None. - If you do choose to provide a timestamp, make sure it has a timezone. - """ - - self._check_for_secret() - - if not user_id: - raise Exception('Must supply a user_id.') - - if traits is not None and not isinstance(traits, dict): - raise Exception('Traits must be a dictionary.') - - if context is not None and not isinstance(context, dict): - raise Exception('Context must be a dictionary.') - - if timestamp is None: - timestamp = datetime.utcnow().replace(tzinfo=tzutc()) - elif not isinstance(timestamp, datetime): - raise Exception('Timestamp must be a datetime object.') - else: - timestamp = guess_timezone(timestamp) - - cleaned_traits = self._clean(traits) - - action = {'userId': user_id, - 'traits': cleaned_traits, - 'context': context, - 'timestamp': timestamp.isoformat(), - 'action': 'identify'} - - context['library'] = 'analytics-python' - - if self._enqueue(action): - self.stats.identifies += 1 - - def track(self, user_id=None, event=None, properties={}, context={}, - timestamp=None): - # TODO: reduce the complexity (mccabe) - """Whenever a user triggers an event, you'll want to track it. - - :param str user_id: the user's id after they are logged in. It's the - same id as which you would recognize a signed-in user in your system. - - :param str event: The event name you are tracking. It is recommended - that it is in human readable form. For example, "Bought T-Shirt" - or "Started an exercise" - - :param dict properties: A dictionary with items that describe the - event in more detail. This argument is optional, but highly recommended - - you'll find these properties extremely useful later. Accepted value - types are string, boolean, ints, doubles, longs, and datetime.datetime. - - :param dict context: An optional dictionary with additional information - thats related to the visit. Examples are userAgent, and IP address - of the visitor. - - :param datetime.datetime timestamp: If this event happened in the past, - the timestamp can be used to designate when the identification - happened. Careful with this one, if it just happened, leave it None. - If you do choose to provide a timestamp, make sure it has a timezone. - - """ - - self._check_for_secret() - - if not user_id: - raise Exception('Must supply a user_id.') - - if not event: - raise Exception('Event is a required argument as a non-empty ' + - 'string.') - - if properties is not None and not isinstance(properties, dict): - raise Exception('Context must be a dictionary.') - - if context is not None and not isinstance(context, dict): - raise Exception('Context must be a dictionary.') - - if timestamp is None: - timestamp = datetime.utcnow().replace(tzinfo=tzutc()) - elif not isinstance(timestamp, datetime): - raise Exception('Timestamp must be a datetime.datetime object.') - else: - timestamp = guess_timezone(timestamp) - - cleaned_properties = self._clean(properties) - - action = {'userId': user_id, - 'event': event, - 'context': context, - 'properties': cleaned_properties, - 'timestamp': timestamp.isoformat(), - 'action': 'track'} - - context['library'] = 'analytics-python' - - if self._enqueue(action): - self.stats.tracks += 1 - - def alias(self, from_id, to_id, context={}, timestamp=None): - # TODO: reduce the complexity (mccabe) - """Aliases an anonymous user into an identified user - - :param str from_id: the anonymous user's id before they are logged in - - :param str to_id: the identified user's id after they're logged in - - :param dict context: An optional dictionary with additional information - thats related to the visit. Examples are userAgent, and IP address - of the visitor. - - :param datetime.datetime timestamp: If this event happened in the past, - the timestamp can be used to designate when the identification - happened. Careful with this one, if it just happened, leave it None. - If you do choose to provide a timestamp, make sure it has a timezone. - """ - - self._check_for_secret() - - if not from_id: - raise Exception('Must supply a from_id.') - - if not to_id: - raise Exception('Must supply a to_id.') - - if context is not None and not isinstance(context, dict): - raise Exception('Context must be a dictionary.') - - if timestamp is None: - timestamp = datetime.utcnow().replace(tzinfo=tzutc()) - elif not isinstance(timestamp, datetime): - raise Exception('Timestamp must be a datetime.datetime object.') - else: - timestamp = guess_timezone(timestamp) - - action = {'from': from_id, - 'to': to_id, - 'context': context, - 'timestamp': timestamp.isoformat(), - 'action': 'alias'} - - context['library'] = 'analytics-python' - - if self._enqueue(action): - self.stats.aliases += 1 - - def _should_flush(self): - """ Determine whether we should sync """ - - full = len(self.queue) >= self.flush_at - stale = self.last_flushed is None - - if not stale: - stale = datetime.now() - self.last_flushed > self.flush_after - - return full or stale - - def _enqueue(self, action): - - # if we've disabled sending, just return False - if not self.send: - return False - - submitted = False - - if len(self.queue) < self.max_queue_size: - self.queue.append(action) - - self.stats.submitted += 1 - - submitted = True - - log('debug', 'Enqueued ' + action['action'] + '.') - - else: - log('warn', 'analytics-python queue is full') - - if self._should_flush(): - self.flush() - - return submitted - - def _on_successful_flush(self, data, response): - if 'batch' in data: - for item in data['batch']: - self.stats.successful += 1 - for callback in self.success_callbacks: - callback(data, response) - - def _on_failed_flush(self, data, error): - if 'batch' in data: - for item in data['batch']: - self.stats.failed += 1 - for callback in self.failure_callbacks: - callback(data, error) - - def _flush_thread_is_free(self): - return self.flushing_thread is None \ - or not self.flushing_thread.is_alive() - - def flush(self, async=None): - """ Forces a flush from the queue to the server """ - - flushing = False - - # if the async arg is provided, it overrides the client's settings - if async is None: - async = self.async - - if async: - # We should asynchronously flush on another thread - with self.flush_lock: - - if self._flush_thread_is_free(): - - log('debug', 'Initiating asynchronous flush ..') - - self.flushing_thread = FlushThread(self) - self.flushing_thread.start() - - flushing = True - - else: - log('debug', 'The flushing thread is still active.') - else: - - # Flushes on this thread - log('debug', 'Initiating synchronous flush ..') - self._sync_flush() - flushing = True - - if flushing: - self.last_flushed = datetime.now() - self.stats.flushes += 1 - - return flushing - - def _sync_flush(self): - - log('debug', 'Starting flush ..') - - successful = 0 - failed = 0 - - url = options.host + options.endpoints['batch'] - - while len(self.queue) > 0: - - batch = [] - for i in range(self.max_flush_size): - if len(self.queue) == 0: - break - - batch.append(self.queue.pop()) - - payload = {'batch': batch, 'secret': self.secret} - - if request(self, url, payload): - successful += len(batch) - else: - failed += len(batch) - - log('debug', 'Successfully flushed {0} items [{1} failed].'. - format(str(successful), str(failed))) diff --git a/analytics/errors.py b/analytics/errors.py deleted file mode 100644 index ca645edb..00000000 --- a/analytics/errors.py +++ /dev/null @@ -1,13 +0,0 @@ - - -class ApiError(Exception): - - def __init__(self, code, message): - self.code = code - self.message = message - - def __repr__(self): - return self.__str__() - - def __str__(self): - return repr(self.message) diff --git a/analytics/options.py b/analytics/options.py deleted file mode 100644 index 326b38ad..00000000 --- a/analytics/options.py +++ /dev/null @@ -1,9 +0,0 @@ - -host = 'https://api.segment.io' - -endpoints = { - 'track': '/v1/track', - 'identify': '/v1/identify', - 'alias': '/v1/alias', - 'batch': '/v1/import' -} diff --git a/analytics/stats.py b/analytics/stats.py deleted file mode 100644 index 096e5310..00000000 --- a/analytics/stats.py +++ /dev/null @@ -1,22 +0,0 @@ - - -class Statistics(object): - - def __init__(self): - # The number of submitted identifies/tracks - self.submitted = 0 - - # The number of identifies submitted - self.identifies = 0 - # The number of tracks submitted - self.tracks = 0 - # The number of aliases - self.aliases = 0 - - # The number of actions to be successful - self.successful = 0 - # The number of actions to fail - self.failed = 0 - - # The number of flushes to happen - self.flushes = 0 diff --git a/analytics/utils.py b/analytics/utils.py deleted file mode 100644 index a955b7b5..00000000 --- a/analytics/utils.py +++ /dev/null @@ -1,26 +0,0 @@ - -from datetime import datetime -from dateutil.tz import tzlocal, tzutc - - -def is_naive(dt): - """ Determines if a given datetime.datetime is naive. """ - return dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None - - -def guess_timezone(dt): - """ Attempts to convert a naive datetime to an aware datetime """ - if is_naive(dt): - # attempts to guess the datetime.datetime.now() local timezone - # case, and then defaults to utc - - delta = datetime.now() - dt - if delta.total_seconds() < 5: - # this was created using datetime.datetime.now() - # so we are in the local timezone - return dt.replace(tzinfo=tzlocal()) - else: - # at this point, the best we can do (I htink) is guess UTC - return dt.replace(tzinfo=tzutc()) - - return dt diff --git a/analytics/version.py b/analytics/version.py deleted file mode 100644 index 52f089c2..00000000 --- a/analytics/version.py +++ /dev/null @@ -1 +0,0 @@ -VERSION = '0.4.1' diff --git a/e2e_test.sh b/e2e_test.sh new file mode 100755 index 00000000..3828abe2 --- /dev/null +++ b/e2e_test.sh @@ -0,0 +1,5 @@ +#!/bin/sh + +set -e + +python ./simulator.py "$@" \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..308a4e87 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +backoff==2.2.1 +cryptography==44.0.0 +flake8==7.1.1 +mock==2.0.0 +pylint==3.3.3 +PyJWT==2.10.1 +python-dateutil==2.8.2 +requests==2.32.3 diff --git a/samples/oauth.py b/samples/oauth.py new file mode 100644 index 00000000..bf1cc70d --- /dev/null +++ b/samples/oauth.py @@ -0,0 +1,52 @@ +import sys +import os +sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")) +import time +import segment.analytics as analytics + +privatekey = '''-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDVll7uJaH322IN +PQsH2aOXZJ2r1q+6hpVK1R5JV1p41PUzn8pOxyXFHWB+53dUd4B8qywKS36XQjp0 +VmhR1tQ22znQ9ZCM6y4LGeOJBjAZiFZLcGQNNrDFC0WGWTrK1ZTS2K7p5qy4fIXG +laNkMXiGGCawkgcHAdOvPTy8m1d9a6YSetYVmBP/tEYN95jPyZFIoHQfkQPBPr9W +cWPpdEBzasHV5d957akjurPpleDiD5as66UW4dkWXvS7Wu7teCLCyDApcyJKTb2Z +SXybmWjhIZuctZMAx3wT/GgW3FbkGaW5KLQgBUMzjpL0fCtMatlqckMD92ll1FuK +R+HnXu05AgMBAAECggEBAK4o2il4GDUh9zbyQo9ZIPLuwT6AZXRED3Igi3ykNQp4 +I6S/s9g+vQaY6LkyBnSiqOt/K/8NBiFSiJWaa5/n+8zrP56qzf6KOlYk+wsdN5Vq +PWtwLrUzljpl8YAWPEFunNa8hwwE42vfZbnDBKNLT4qQIOQzfnVxQOoQlfj49gM2 +iSrblvsnQTyucFy3UyTeioHbh8q2Xqcxry5WUCOrFDd3IIwATTpLZGw0IPeuFJbJ +NfBizLEcyJaM9hujQU8PRCRd16MWX+bbYM6Mh4dkT40QXWsVnHBHwsgPdQgDgseF +Na4ajtHoC0DlwYCXpCm3IzJfKfq/LR2q8NDUgKeF4AECgYEA9nD4czza3SRbzhpZ +bBoK77CSNqCcMAqyuHB0hp/XX3yB7flF9PIPb2ReO8wwmjbxn+bm7PPz2Uwd2SzO +pU+FXmyKJr53Jxw/hmDWZCoh42gsGDlVqpmytzsj74KlaYiMyZmEGbD7t/FGfNGV +LdLDJaHIYxEimFviOTXKCeKvPAECgYEA3d8tv4jdp1uAuRZiU9Z/tfw5mJOi3oXF +8AdFFDwaPzcTorEAxjrt9X6IjPbLIDJNJtuXYpe+dG6720KyuNnhLhWW9oZEJTwT +dUgqZ2fTCOS9uH0jSn+ZFlgTWI6UDQXRwE7z8avlhMIrQVmPsttGTo7V6sQVtGRx +bNj2RSVekTkCgYAJvy4UYLPHS0jWPfSLcfw8vp8JyhBjVgj7gncZW/kIrcP1xYYe +yfQSU8XmV40UjFfCGz/G318lmP0VOdByeVKtCV3talsMEPHyPqI8E+6DL/uOebYJ +qUqINK6XKnOgWOY4kvnGillqTQCcry1XQp61PlDOmj7kB75KxPXYrj6AAQKBgQDa ++ixCv6hURuEyy77cE/YT/Q4zYnL6wHjtP5+UKwWUop1EkwG6o+q7wtiul90+t6ah +1VUCP9X/QFM0Qg32l0PBohlO0pFrVnG17TW8vSHxwyDkds1f97N19BOT8ZR5jebI +sKPfP9LVRnY+l1BWLEilvB+xBzqMwh2YWkIlWI6PMQKBgGi6TBnxp81lOYrxVRDj +/3ycRnVDmBdlQKFunvfzUBmG1mG/G0YHeVSUKZJGX7w2l+jnDwIA383FcUeA8X6A +l9q+amhtkwD/6fbkAu/xoWNl+11IFoxd88y2ByBFoEKB6UVLuCTSKwXDqzEZet7x +mDyRxq7ohIzLkw8b8buDeuXZ +-----END PRIVATE KEY----- +''' # Should be read from a file on disk which can be rotated out + +analytics.write_key = '' + +analytics.oauth_client_id = 'CLIENT_ID' # OAuth application ID from segment dashboard +analytics.oauth_client_key = privatekey # generated as a public/private key pair in PEM format from OpenSSL +analytics.oauth_key_id = 'KEY_ID' # From segment dashboard after uploading public key +analytics.oauth_scope = 'tracking_api:write' #'public_api:read_write' + +def on_error(error, items): + print("An error occurred: ", error) +analytics.debug = True +analytics.on_error = on_error + +analytics.track('AUser', 'track') +analytics.flush() + +time.sleep(3) \ No newline at end of file diff --git a/segment/analytics/__init__.py b/segment/analytics/__init__.py new file mode 100644 index 00000000..ef35f67f --- /dev/null +++ b/segment/analytics/__init__.py @@ -0,0 +1,93 @@ + +from segment.analytics.version import VERSION +from segment.analytics.client import Client + +__version__ = VERSION + +"""Settings.""" +write_key = Client.DefaultConfig.write_key +host = Client.DefaultConfig.host +on_error = Client.DefaultConfig.on_error +debug = Client.DefaultConfig.debug +log_handler = Client.DefaultConfig.log_handler +send = Client.DefaultConfig.send +sync_mode = Client.DefaultConfig.sync_mode +max_queue_size = Client.DefaultConfig.max_queue_size +gzip = Client.DefaultConfig.gzip +timeout = Client.DefaultConfig.timeout +max_retries = Client.DefaultConfig.max_retries + +"""Oauth Settings.""" +oauth_client_id = Client.DefaultConfig.oauth_client_id +oauth_client_key = Client.DefaultConfig.oauth_client_key +oauth_key_id = Client.DefaultConfig.oauth_key_id +oauth_auth_server = Client.DefaultConfig.oauth_auth_server +oauth_scope = Client.DefaultConfig.oauth_scope + +default_client = None + + +def track(*args, **kwargs): + """Send a track call.""" + return _proxy('track', *args, **kwargs) + + +def identify(*args, **kwargs): + """Send a identify call.""" + return _proxy('identify', *args, **kwargs) + + +def group(*args, **kwargs): + """Send a group call.""" + return _proxy('group', *args, **kwargs) + + +def alias(*args, **kwargs): + """Send a alias call.""" + return _proxy('alias', *args, **kwargs) + + +def page(*args, **kwargs): + """Send a page call.""" + return _proxy('page', *args, **kwargs) + + +def screen(*args, **kwargs): + """Send a screen call.""" + return _proxy('screen', *args, **kwargs) + + +def flush(): + """Tell the client to flush.""" + _proxy('flush') + + +def join(): + """Block program until the client clears the queue""" + _proxy('join') + + +def shutdown(): + """Flush all messages and cleanly shutdown the client""" + _proxy('flush') + _proxy('join') + + +def _proxy(method, *args, **kwargs): + """Create an analytics client if one doesn't exist and send to it.""" + global default_client + if not default_client: + default_client = Client(write_key, host=host, debug=debug, + max_queue_size=max_queue_size, + send=send, on_error=on_error, + gzip=gzip, max_retries=max_retries, + sync_mode=sync_mode, timeout=timeout, + oauth_client_id=oauth_client_id, + oauth_client_key=oauth_client_key, + oauth_key_id=oauth_key_id, + oauth_auth_server=oauth_auth_server, + oauth_scope=oauth_scope, + ) + + fn = getattr(default_client, method) + return fn(*args, **kwargs) diff --git a/segment/analytics/client.py b/segment/analytics/client.py new file mode 100644 index 00000000..0f8015cd --- /dev/null +++ b/segment/analytics/client.py @@ -0,0 +1,358 @@ +from datetime import datetime +from uuid import uuid4 +import logging +import numbers +import atexit +import json + +from dateutil.tz import tzutc +from segment.analytics.oauth_manager import OauthManager + +from segment.analytics.utils import guess_timezone, clean +from segment.analytics.consumer import Consumer, MAX_MSG_SIZE +from segment.analytics.request import post, DatetimeSerializer +from segment.analytics.version import VERSION + +import queue + +ID_TYPES = (numbers.Number, str) + + +class Client(object): + class DefaultConfig(object): + write_key = None + host = None + on_error = None + debug = False + log_handler = None + send = True + sync_mode = False + max_queue_size = 10000 + gzip = False + timeout = 15 + max_retries = 10 + proxies = None + thread = 1 + upload_interval = 0.5 + upload_size = 100 + oauth_client_id = None + oauth_client_key = None + oauth_key_id = None + oauth_auth_server = 'https://oauth2.segment.io' + oauth_scope = 'tracking_api:write' + + + """Create a new Segment client.""" + log = logging.getLogger('segment') + + def __init__(self, + write_key=DefaultConfig.write_key, + host=DefaultConfig.host, + debug=DefaultConfig.debug, + max_queue_size=DefaultConfig.max_queue_size, + send=DefaultConfig.send, + on_error=DefaultConfig.on_error, + gzip=DefaultConfig.gzip, + max_retries=DefaultConfig.max_retries, + sync_mode=DefaultConfig.sync_mode, + timeout=DefaultConfig.timeout, + proxies=DefaultConfig.proxies, + thread=DefaultConfig.thread, + upload_size=DefaultConfig.upload_size, + upload_interval=DefaultConfig.upload_interval, + log_handler=DefaultConfig.log_handler, + oauth_client_id=DefaultConfig.oauth_client_id, + oauth_client_key=DefaultConfig.oauth_client_key, + oauth_key_id=DefaultConfig.oauth_key_id, + oauth_auth_server=DefaultConfig.oauth_auth_server, + oauth_scope=DefaultConfig.oauth_scope,): + require('write_key', write_key, str) + + self.queue = queue.Queue(max_queue_size) + self.write_key = write_key + self.on_error = on_error + self.debug = debug + self.send = send + self.sync_mode = sync_mode + self.host = host + self.gzip = gzip + self.timeout = timeout + self.proxies = proxies + self.oauth_manager = None + if(oauth_client_id and oauth_client_key and oauth_key_id): + self.oauth_manager = OauthManager(oauth_client_id, oauth_client_key, oauth_key_id, + oauth_auth_server, oauth_scope, timeout, max_retries) + + if log_handler: + self.log.addHandler(log_handler) + + if debug: + self.log.setLevel(logging.DEBUG) + if not log_handler: + # default log handler does not print debug or info + self.log.addHandler(logging.StreamHandler()) + + if sync_mode: + self.consumers = None + else: + # On program exit, allow the consumer thread to exit cleanly. + # This prevents exceptions and a messy shutdown when the + # interpreter is destroyed before the daemon thread finishes + # execution. However, it is *not* the same as flushing the queue! + # To guarantee all messages have been delivered, you'll still need + # to call flush(). + if send: + atexit.register(self.join) + for _ in range(thread): + self.consumers = [] + consumer = Consumer( + self.queue, write_key, host=host, on_error=on_error, + upload_size=upload_size, upload_interval=upload_interval, + gzip=gzip, retries=max_retries, timeout=timeout, + proxies=proxies, oauth_manager=self.oauth_manager, + ) + self.consumers.append(consumer) + + # if we've disabled sending, just don't start the consumer + if send: + consumer.start() + + def identify(self, user_id=None, traits=None, context=None, timestamp=None, + anonymous_id=None, integrations=None, message_id=None): + traits = traits or {} + context = context or {} + integrations = integrations or {} + require('user_id or anonymous_id', user_id or anonymous_id, ID_TYPES) + require('traits', traits, dict) + + msg = { + 'integrations': integrations, + 'anonymousId': anonymous_id, + 'timestamp': timestamp, + 'context': context, + 'type': 'identify', + 'userId': user_id, + 'traits': traits, + 'messageId': message_id, + } + + return self._enqueue(msg) + + def track(self, user_id=None, event=None, properties=None, context=None, + timestamp=None, anonymous_id=None, integrations=None, + message_id=None): + properties = properties or {} + context = context or {} + integrations = integrations or {} + require('user_id or anonymous_id', user_id or anonymous_id, ID_TYPES) + require('properties', properties, dict) + require('event', event, str) + + msg = { + 'integrations': integrations, + 'anonymousId': anonymous_id, + 'properties': properties, + 'timestamp': timestamp, + 'context': context, + 'userId': user_id, + 'type': 'track', + 'event': event, + 'messageId': message_id, + } + + return self._enqueue(msg) + + def alias(self, previous_id=None, user_id=None, context=None, + timestamp=None, integrations=None, message_id=None): + context = context or {} + integrations = integrations or {} + require('previous_id', previous_id, ID_TYPES) + require('user_id', user_id, ID_TYPES) + + msg = { + 'integrations': integrations, + 'previousId': previous_id, + 'timestamp': timestamp, + 'context': context, + 'userId': user_id, + 'type': 'alias', + 'messageId': message_id, + } + + return self._enqueue(msg) + + def group(self, user_id=None, group_id=None, traits=None, context=None, + timestamp=None, anonymous_id=None, integrations=None, + message_id=None): + traits = traits or {} + context = context or {} + integrations = integrations or {} + require('user_id or anonymous_id', user_id or anonymous_id, ID_TYPES) + require('group_id', group_id, ID_TYPES) + require('traits', traits, dict) + + msg = { + 'integrations': integrations, + 'anonymousId': anonymous_id, + 'timestamp': timestamp, + 'groupId': group_id, + 'context': context, + 'userId': user_id, + 'traits': traits, + 'type': 'group', + 'messageId': message_id, + } + + return self._enqueue(msg) + + def page(self, user_id=None, category=None, name=None, properties=None, + context=None, timestamp=None, anonymous_id=None, + integrations=None, message_id=None): + properties = properties or {} + context = context or {} + integrations = integrations or {} + require('user_id or anonymous_id', user_id or anonymous_id, ID_TYPES) + require('properties', properties, dict) + + if name: + require('name', name, str) + if category: + require('category', category, str) + + msg = { + 'integrations': integrations, + 'anonymousId': anonymous_id, + 'properties': properties, + 'timestamp': timestamp, + 'category': category, + 'context': context, + 'userId': user_id, + 'type': 'page', + 'name': name, + 'messageId': message_id, + } + + return self._enqueue(msg) + + def screen(self, user_id=None, category=None, name=None, properties=None, + context=None, timestamp=None, anonymous_id=None, + integrations=None, message_id=None): + properties = properties or {} + context = context or {} + integrations = integrations or {} + require('user_id or anonymous_id', user_id or anonymous_id, ID_TYPES) + require('properties', properties, dict) + + if name: + require('name', name, str) + if category: + require('category', category, str) + + msg = { + 'integrations': integrations, + 'anonymousId': anonymous_id, + 'properties': properties, + 'timestamp': timestamp, + 'category': category, + 'context': context, + 'userId': user_id, + 'type': 'screen', + 'name': name, + 'messageId': message_id, + } + + return self._enqueue(msg) + + def _enqueue(self, msg): + """Push a new `msg` onto the queue, return `(success, msg)`""" + timestamp = msg['timestamp'] + if timestamp is None: + timestamp = datetime.now(tz=tzutc()) + message_id = msg.get('messageId') + if message_id is None: + message_id = uuid4() + + require('integrations', msg['integrations'], dict) + require('type', msg['type'], str) + require('timestamp', timestamp, datetime) + require('context', msg['context'], dict) + + # add common + timestamp = guess_timezone(timestamp) + msg['timestamp'] = timestamp.isoformat(timespec='milliseconds') + msg['messageId'] = stringify_id(message_id) + msg['context']['library'] = { + 'name': 'analytics-python', + 'version': VERSION + } + + msg['userId'] = stringify_id(msg.get('userId', None)) + msg['anonymousId'] = stringify_id(msg.get('anonymousId', None)) + + msg = clean(msg) + self.log.debug('queueing: %s', msg) + + # Check message size. + msg_size = len(json.dumps(msg, cls=DatetimeSerializer).encode()) + if msg_size > MAX_MSG_SIZE: + raise RuntimeError('Message exceeds %skb limit. (%s)', str(int(MAX_MSG_SIZE / 1024)), str(msg)) + + # if send is False, return msg as if it was successfully queued + if not self.send: + return True, msg + + if self.sync_mode: + self.log.debug('enqueued with blocking %s.', msg['type']) + post(self.write_key, self.host, gzip=self.gzip, + timeout=self.timeout, proxies=self.proxies, + oauth_manager=self.oauth_manager, batch=[msg]) + + return True, msg + + try: + self.queue.put(msg, block=False) + self.log.debug('enqueued %s.', msg['type']) + return True, msg + except queue.Full: + self.log.warning('analytics-python queue is full') + return False, msg + + def flush(self): + """Forces a flush from the internal queue to the server""" + queue = self.queue + size = queue.qsize() + queue.join() + # Note that this message may not be precise, because of threading. + self.log.debug('successfully flushed about %s items.', size) + + def join(self): + """Ends the consumer thread once the queue is empty. + Blocks execution until finished + """ + for consumer in self.consumers: + consumer.pause() + try: + consumer.join() + except RuntimeError: + # consumer thread has not started + pass + + def shutdown(self): + """Flush all messages and cleanly shutdown the client""" + self.flush() + self.join() + + +def require(name, field, data_type): + """Require that the named `field` has the right `data_type`""" + if not isinstance(field, data_type): + msg = '{0} must have {1}, got: {2}'.format(name, data_type, field) + raise AssertionError(msg) + + +def stringify_id(val): + if val is None: + return None + if isinstance(val, str): + return val + return str(val) diff --git a/segment/analytics/consumer.py b/segment/analytics/consumer.py new file mode 100644 index 00000000..157e3c93 --- /dev/null +++ b/segment/analytics/consumer.py @@ -0,0 +1,159 @@ +import logging +import time +from threading import Thread +import backoff +import json + +from segment.analytics.request import post, APIError, DatetimeSerializer + +from queue import Empty + +MAX_MSG_SIZE = 32 << 10 + +# Our servers only accept batches less than 500KB. Here limit is set slightly +# lower to leave space for extra data that will be added later, eg. "sentAt". +BATCH_SIZE_LIMIT = 475000 + + +class FatalError(Exception): + def __init__(self, message): + self.message = message + + def __str__(self): + msg = "[Segment] {0})" + return msg.format(self.message) + + +class Consumer(Thread): + """Consumes the messages from the client's queue.""" + log = logging.getLogger('segment') + + def __init__(self, queue, write_key, upload_size=100, host=None, + on_error=None, upload_interval=0.5, gzip=False, retries=10, + timeout=15, proxies=None, oauth_manager=None): + """Create a consumer thread.""" + Thread.__init__(self) + # Make consumer a daemon thread so that it doesn't block program exit + self.daemon = True + self.upload_size = upload_size + self.upload_interval = upload_interval + self.write_key = write_key + self.host = host + self.on_error = on_error + self.queue = queue + self.gzip = gzip + # It's important to set running in the constructor: if we are asked to + # pause immediately after construction, we might set running to True in + # run() *after* we set it to False in pause... and keep running + # forever. + self.running = True + self.retries = retries + self.timeout = timeout + self.proxies = proxies + self.oauth_manager = oauth_manager + + def run(self): + """Runs the consumer.""" + self.log.debug('consumer is running...') + while self.running: + self.upload() + + self.log.debug('consumer exited.') + + def pause(self): + """Pause the consumer.""" + self.running = False + + def upload(self): + """Upload the next batch of items, return whether successful.""" + success = False + batch = self.next() + if len(batch) == 0: + return False + + try: + self.request(batch) + success = True + except Exception as e: + self.log.error('error uploading: %s', e) + success = False + if self.on_error: + self.on_error(e, batch) + finally: + # mark items as acknowledged from queue + for _ in batch: + self.queue.task_done() + return success + + def next(self): + """Return the next batch of items to upload.""" + queue = self.queue + items = [] + + start_time = time.monotonic() + total_size = 0 + + while len(items) < self.upload_size: + elapsed = time.monotonic() - start_time + if elapsed >= self.upload_interval: + break + try: + item = queue.get( + block=True, timeout=self.upload_interval - elapsed) + item_size = len(json.dumps( + item, cls=DatetimeSerializer).encode()) + if item_size > MAX_MSG_SIZE: + self.log.error( + 'Item exceeds 32kb limit, dropping. (%s)', str(item)) + continue + items.append(item) + total_size += item_size + if total_size >= BATCH_SIZE_LIMIT: + self.log.debug( + 'hit batch size limit (size: %d)', total_size) + break + except Empty: + break + except Exception as e: + self.log.exception('Exception: %s', e) + + return items + + def request(self, batch): + """Attempt to upload the batch and retry before raising an error """ + + def fatal_exception(exc): + if isinstance(exc, APIError): + # retry on server errors and client errors + # with 429 status code (rate limited), + # don't retry on other client errors + return (400 <= exc.status < 500) and exc.status != 429 + elif isinstance(exc, FatalError): + return True + else: + # retry on all other errors (eg. network) + return False + + attempt_count = 0 + + @backoff.on_exception( + backoff.expo, + Exception, + max_tries=self.retries + 1, + giveup=fatal_exception, + on_backoff=lambda details: self.log.debug( + f"Retry attempt {details['tries']}/{self.retries + 1} after {details['elapsed']:.2f}s" + )) + def send_request(): + nonlocal attempt_count + attempt_count += 1 + try: + return post(self.write_key, self.host, gzip=self.gzip, + timeout=self.timeout, batch=batch, proxies=self.proxies, + oauth_manager=self.oauth_manager) + except Exception as e: + if attempt_count >= self.retries + 1: + self.log.error(f"All {self.retries} retries exhausted. Final error: {e}") + raise + + send_request() diff --git a/segment/analytics/oauth_manager.py b/segment/analytics/oauth_manager.py new file mode 100644 index 00000000..61a54ea5 --- /dev/null +++ b/segment/analytics/oauth_manager.py @@ -0,0 +1,208 @@ +from datetime import date, datetime +import logging +import threading +import time +import uuid +from requests import sessions +import jwt + +from segment.analytics import utils +from segment.analytics.request import APIError +from segment.analytics.consumer import FatalError + +_session = sessions.Session() + +class OauthManager(object): + def __init__(self, + client_id, + client_key, + key_id, + auth_server='https://oauth2.segment.io', + scope='tracking_api:write', + timeout=15, + max_retries=3): + self.client_id = client_id + self.client_key = client_key + self.key_id = key_id + self.auth_server = auth_server + self.scope = scope + self.timeout = timeout + self.max_retries = max_retries + self.retry_count = 0 + self.clock_skew = 0 + + self.log = logging.getLogger('segment') + self.thread = None + self.token_mutex = threading.Lock() + self.token = None + self.error = None + + def get_token(self): + with self.token_mutex: + if self.token: + return self.token + # No good token, start the loop + self.log.debug("OAuth is enabled. No cached access token.") + # Make sure we're not waiting an excessively long time (this will not cancel 429 waits) + if self.thread and self.thread.is_alive(): + self.thread.cancel() + self.thread = threading.Timer(0,self._poller_loop) + self.thread.daemon = True + self.thread.start() + + while True: + # Wait for a token or error + with self.token_mutex: + if self.token: + return self.token + if self.error: + error = self.error + self.error = None + raise error + if self.thread: + # Wait for a cycle, may not have an answer immediately + self.thread.join(1) + + def clear_token(self): + self.log.debug("OAuth Token invalidated. Poller for new token is {}".format( + "active" if self.thread and self.thread.is_alive() else "stopped" )) + with self.token_mutex: + self.token = None + + def _request_token(self): + jwt_body = { + "iss": self.client_id, + "sub": self.client_id, + "aud": utils.remove_trailing_slash(self.auth_server), + "iat": int(time.time())-5 - self.clock_skew, + "exp": int(time.time()) + 55 - self.clock_skew, + "jti": str(uuid.uuid4()) + } + + signed_jwt = jwt.encode( + jwt_body, + self.client_key, + algorithm="RS256", + headers={"kid": self.key_id}, + ) + + request_body = 'grant_type=client_credentials&client_assertion_type='\ + 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer&'\ + 'client_assertion={}&scope={}'.format(signed_jwt, self.scope) + + token_endpoint = f'{utils.remove_trailing_slash(self.auth_server)}/token' + + self.log.debug("OAuth token requested from {} with size {}".format(token_endpoint, len(request_body))) + + res = _session.post(url=token_endpoint, data=request_body, timeout=self.timeout, + headers={'Content-Type': 'application/x-www-form-urlencoded'}) + return res + + def _poller_loop(self): + refresh_timer_ms = 25 + response = None + + try: + response = self._request_token() + except Exception as e: + self.retry_count += 1 + if self.retry_count < self.max_retries: + self.log.debug("OAuth token request encountered an error on attempt {}: {}".format(self.retry_count ,e)) + self.thread = threading.Timer(refresh_timer_ms / 1000.0, self._poller_loop) + self.thread.daemon = True + self.thread.start() + return + # Too many retries, giving up + self.log.error("OAuth token request encountered an error after {} attempts: {}".format(self.retry_count ,e)) + self.error = FatalError(str(e)) + return + if response.headers.get("Date"): + try: + server_time = datetime.strptime(response.headers.get("Date"), "%a, %d %b %Y %H:%M:%S %Z") + self.clock_skew = int((datetime.utcnow() - server_time).total_seconds()) + except Exception as e: + self.log.error("OAuth token request received a response with an invalid Date header: {} | {}".format(response, e)) + + if response.status_code == 200: + data = None + try: + data = response.json() + except Exception as e: + self.retry_count += 1 + if self.retry_count < self.max_retries: + self.thread = threading.Timer(refresh_timer_ms / 1000.0, self._poller_loop) + self.thread.daemon = True + self.thread.start() + return + # Too many retries, giving up + self.error = e + return + try: + with self.token_mutex: + self.token = data['access_token'] + # success! + self.retry_count = 0 + except Exception as e: + # No access token in response? + self.log.error("OAuth token request received a successful response with a missing token: {}".format(response)) + try: + refresh_timer_ms = int(data['expires_in']) / 2 * 1000 + except Exception as e: + refresh_timer_ms = 60 * 1000 + + elif response.status_code == 429: + self.retry_count += 1 + rate_limit_reset_time = None + try: + rate_limit_reset_time = int(response.headers.get("X-RateLimit-Reset")) + except Exception as e: + self.log.error("OAuth rate limit response did not have a valid rest time: {} | {}".format(response, e)) + if rate_limit_reset_time: + refresh_timer_ms = rate_limit_reset_time * 1000 + else: + refresh_timer_ms = 5 * 1000 + + self.log.debug("OAuth token request encountered a rate limit response, waiting {} ms".format(refresh_timer_ms)) + # We want subsequent calls to get_token to be able to interrupt our + # Timeout when it's waiting for e.g. a long normal expiration, but + # not when we're waiting for a rate limit reset. Sleep instead. + time.sleep(refresh_timer_ms / 1000.0) + refresh_timer_ms = 0 + elif response.status_code in [400, 401, 415]: + # unrecoverable errors (except for skew). APIError will be handled by request.py + self.retry_count = 0 + try: + payload = response.json() + + if (payload.get('error') == 'invalid_request' and + (payload.get('error_description') == 'Token is expired' or + payload.get('error_description') == 'Token used before issued')): + refresh_timer_ms = 0 # Retry immediately hopefully with a good skew value + self.thread = threading.Timer(refresh_timer_ms / 1000.0, self._poller_loop) + self.thread.daemon = True + self.thread.start() + return + + self.error = APIError(response.status_code, payload['error'], payload['error_description']) + except ValueError: + self.error = APIError(response.status_code, 'unknown', response.text) + self.log.error("OAuth token request error was unrecoverable, possibly due to configuration: {}".format(self.error)) + return + else: + # any other error + self.log.debug("OAuth token request error, attempt {}: [{}] {}".format(self.retry_count, response.status_code, response.reason)) + self.retry_count += 1 + + if self.retry_count > 0 and self.retry_count % self.max_retries == 0: + # every time we pass the retry count, put up an error to release any waiting token requests + try: + payload = response.json() + self.error = APIError(response.status_code, payload['error'], payload['error_description']) + except ValueError: + self.error = APIError(response.status_code, 'unknown', response.text) + self.log.error("OAuth token request error after {} attempts: {}".format(self.retry_count, self.error)) + + # loop + self.thread = threading.Timer(refresh_timer_ms / 1000.0, self._poller_loop) + self.thread.daemon = True + self.thread.start() diff --git a/segment/analytics/request.py b/segment/analytics/request.py new file mode 100644 index 00000000..ab92b807 --- /dev/null +++ b/segment/analytics/request.py @@ -0,0 +1,92 @@ +from datetime import date, datetime +from io import BytesIO +from gzip import GzipFile +import logging +import json +from dateutil.tz import tzutc +from requests.auth import HTTPBasicAuth +from requests import sessions + +from segment.analytics.version import VERSION +from segment.analytics.utils import remove_trailing_slash + +_session = sessions.Session() + + +def post(write_key, host=None, gzip=False, timeout=15, proxies=None, oauth_manager=None, **kwargs): + """Post the `kwargs` to the API""" + log = logging.getLogger('segment') + body = kwargs + if not "sentAt" in body.keys(): + body["sentAt"] = datetime.now(tz=tzutc()).isoformat() + body["writeKey"] = write_key + url = remove_trailing_slash(host or 'https://api.segment.io') + '/v1/batch' + auth = None + if oauth_manager: + auth = oauth_manager.get_token() + data = json.dumps(body, cls=DatetimeSerializer) + log.debug('making request: %s', data) + headers = { + 'Content-Type': 'application/json', + 'User-Agent': 'analytics-python/' + VERSION + } + if auth: + headers['Authorization'] = 'Bearer {}'.format(auth) + + if gzip: + headers['Content-Encoding'] = 'gzip' + buf = BytesIO() + with GzipFile(fileobj=buf, mode='w') as gz: + # 'data' was produced by json.dumps(), + # whose default encoding is utf-8. + gz.write(data.encode('utf-8')) + data = buf.getvalue() + + kwargs = { + "data": data, + "headers": headers, + "timeout": timeout, + } + + if proxies: + kwargs['proxies'] = proxies + + try: + res = _session.post(url, **kwargs) + except Exception as e: + raise e + + if res.status_code == 200: + log.debug('data uploaded successfully') + return res + + if oauth_manager and res.status_code in [400, 401, 403]: + oauth_manager.clear_token() + + try: + payload = res.json() + log.debug('received response: %s', payload) + raise APIError(res.status_code, payload['code'], payload['message']) + except ValueError: + log.error('Unknown error: [%s] %s', res.status_code, res.reason) + raise APIError(res.status_code, 'unknown', res.text) + + +class APIError(Exception): + + def __init__(self, status, code, message): + self.message = message + self.status = status + self.code = code + + def __str__(self): + msg = "[Segment] {0}: {1} ({2})" + return msg.format(self.code, self.message, self.status) + + +class DatetimeSerializer(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, (date, datetime)): + return obj.isoformat() + + return json.JSONEncoder.default(self, obj) diff --git a/segment/analytics/test/__init__.py b/segment/analytics/test/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/segment/analytics/test/test_client.py b/segment/analytics/test/test_client.py new file mode 100644 index 00000000..eb68400c --- /dev/null +++ b/segment/analytics/test/test_client.py @@ -0,0 +1,362 @@ +from datetime import date, datetime +import unittest +import time +import mock + +from segment.analytics.version import VERSION +from segment.analytics.client import Client + + +class TestClient(unittest.TestCase): + + def fail(self, e, batch=[]): + """Mark the failure handler""" + self.failed = True + + def setUp(self): + self.failed = False + self.client = Client('testsecret', on_error=self.fail) + + def test_requires_write_key(self): + self.assertRaises(AssertionError, Client) + + def test_empty_flush(self): + self.client.flush() + + def test_basic_track(self): + client = self.client + success, msg = client.track('userId', 'python test event') + client.flush() + self.assertTrue(success) + self.assertFalse(self.failed) + + self.assertEqual(msg['event'], 'python test event') + self.assertTrue(isinstance(msg['timestamp'], str)) + self.assertTrue(isinstance(msg['messageId'], str)) + self.assertEqual(msg['userId'], 'userId') + self.assertEqual(msg['properties'], {}) + self.assertEqual(msg['type'], 'track') + + def test_stringifies_user_id(self): + # A large number that loses precision in node: + # node -e "console.log(157963456373623802 + 1)" > 157963456373623800 + client = self.client + success, msg = client.track( + user_id=157963456373623802, event='python test event') + client.flush() + self.assertTrue(success) + self.assertFalse(self.failed) + + self.assertEqual(msg['userId'], '157963456373623802') + self.assertEqual(msg['anonymousId'], None) + + def test_stringifies_anonymous_id(self): + # A large number that loses precision in node: + # node -e "console.log(157963456373623803 + 1)" > 157963456373623800 + client = self.client + success, msg = client.track( + anonymous_id=157963456373623803, event='python test event') + client.flush() + self.assertTrue(success) + self.assertFalse(self.failed) + + self.assertEqual(msg['userId'], None) + self.assertEqual(msg['anonymousId'], '157963456373623803') + + def test_advanced_track(self): + client = self.client + success, msg = client.track( + 'userId', 'python test event', {'property': 'value'}, + {'ip': '192.168.0.1'}, datetime(2014, 9, 3), 'anonymousId', + {'Amplitude': True}, 'messageId') + + self.assertTrue(success) + + self.assertEqual(msg['timestamp'], '2014-09-03T00:00:00+00:00') + self.assertEqual(msg['properties'], {'property': 'value'}) + self.assertEqual(msg['integrations'], {'Amplitude': True}) + self.assertEqual(msg['context']['ip'], '192.168.0.1') + self.assertEqual(msg['event'], 'python test event') + self.assertEqual(msg['anonymousId'], 'anonymousId') + self.assertEqual(msg['context']['library'], { + 'name': 'analytics-python', + 'version': VERSION + }) + self.assertEqual(msg['messageId'], 'messageId') + self.assertEqual(msg['userId'], 'userId') + self.assertEqual(msg['type'], 'track') + + def test_basic_identify(self): + client = self.client + success, msg = client.identify('userId', {'trait': 'value'}) + client.flush() + self.assertTrue(success) + self.assertFalse(self.failed) + + self.assertEqual(msg['traits'], {'trait': 'value'}) + self.assertTrue(isinstance(msg['timestamp'], str)) + self.assertTrue(isinstance(msg['messageId'], str)) + self.assertEqual(msg['userId'], 'userId') + self.assertEqual(msg['type'], 'identify') + + def test_advanced_identify(self): + client = self.client + success, msg = client.identify( + 'userId', {'trait': 'value'}, {'ip': '192.168.0.1'}, + datetime(2014, 9, 3), 'anonymousId', {'Amplitude': True}, + 'messageId') + + self.assertTrue(success) + + self.assertEqual(msg['timestamp'], '2014-09-03T00:00:00+00:00') + self.assertEqual(msg['integrations'], {'Amplitude': True}) + self.assertEqual(msg['context']['ip'], '192.168.0.1') + self.assertEqual(msg['traits'], {'trait': 'value'}) + self.assertEqual(msg['anonymousId'], 'anonymousId') + self.assertEqual(msg['context']['library'], { + 'name': 'analytics-python', + 'version': VERSION + }) + self.assertTrue(isinstance(msg['timestamp'], str)) + self.assertEqual(msg['messageId'], 'messageId') + self.assertEqual(msg['userId'], 'userId') + self.assertEqual(msg['type'], 'identify') + + def test_basic_group(self): + client = self.client + success, msg = client.group('userId', 'groupId') + client.flush() + self.assertTrue(success) + self.assertFalse(self.failed) + + self.assertEqual(msg['groupId'], 'groupId') + self.assertEqual(msg['userId'], 'userId') + self.assertEqual(msg['type'], 'group') + + def test_advanced_group(self): + client = self.client + success, msg = client.group( + 'userId', 'groupId', {'trait': 'value'}, {'ip': '192.168.0.1'}, + datetime(2014, 9, 3), 'anonymousId', {'Amplitude': True}, + 'messageId') + + self.assertTrue(success) + + self.assertEqual(msg['timestamp'], '2014-09-03T00:00:00+00:00') + self.assertEqual(msg['integrations'], {'Amplitude': True}) + self.assertEqual(msg['context']['ip'], '192.168.0.1') + self.assertEqual(msg['traits'], {'trait': 'value'}) + self.assertEqual(msg['anonymousId'], 'anonymousId') + self.assertEqual(msg['context']['library'], { + 'name': 'analytics-python', + 'version': VERSION + }) + self.assertTrue(isinstance(msg['timestamp'], str)) + self.assertEqual(msg['messageId'], 'messageId') + self.assertEqual(msg['userId'], 'userId') + self.assertEqual(msg['type'], 'group') + + def test_basic_alias(self): + client = self.client + success, msg = client.alias('previousId', 'userId') + client.flush() + self.assertTrue(success) + self.assertFalse(self.failed) + self.assertEqual(msg['previousId'], 'previousId') + self.assertEqual(msg['userId'], 'userId') + + def test_basic_page(self): + client = self.client + success, msg = client.page('userId', name='name') + self.assertFalse(self.failed) + client.flush() + self.assertTrue(success) + self.assertEqual(msg['userId'], 'userId') + self.assertEqual(msg['type'], 'page') + self.assertEqual(msg['name'], 'name') + + def test_advanced_page(self): + client = self.client + success, msg = client.page( + 'userId', 'category', 'name', {'property': 'value'}, + {'ip': '192.168.0.1'}, datetime(2014, 9, 3), 'anonymousId', + {'Amplitude': True}, 'messageId') + + self.assertTrue(success) + + self.assertEqual(msg['timestamp'], '2014-09-03T00:00:00+00:00') + self.assertEqual(msg['integrations'], {'Amplitude': True}) + self.assertEqual(msg['context']['ip'], '192.168.0.1') + self.assertEqual(msg['properties'], {'property': 'value'}) + self.assertEqual(msg['anonymousId'], 'anonymousId') + self.assertEqual(msg['context']['library'], { + 'name': 'analytics-python', + 'version': VERSION + }) + self.assertEqual(msg['category'], 'category') + self.assertTrue(isinstance(msg['timestamp'], str)) + self.assertEqual(msg['messageId'], 'messageId') + self.assertEqual(msg['userId'], 'userId') + self.assertEqual(msg['type'], 'page') + self.assertEqual(msg['name'], 'name') + + def test_basic_screen(self): + client = self.client + success, msg = client.screen('userId', name='name') + client.flush() + self.assertTrue(success) + self.assertEqual(msg['userId'], 'userId') + self.assertEqual(msg['type'], 'screen') + self.assertEqual(msg['name'], 'name') + + def test_advanced_screen(self): + client = self.client + success, msg = client.screen( + 'userId', 'category', 'name', {'property': 'value'}, + {'ip': '192.168.0.1'}, datetime(2014, 9, 3), 'anonymousId', + {'Amplitude': True}, 'messageId') + + self.assertTrue(success) + + self.assertEqual(msg['timestamp'], '2014-09-03T00:00:00+00:00') + self.assertEqual(msg['integrations'], {'Amplitude': True}) + self.assertEqual(msg['context']['ip'], '192.168.0.1') + self.assertEqual(msg['properties'], {'property': 'value'}) + self.assertEqual(msg['anonymousId'], 'anonymousId') + self.assertEqual(msg['context']['library'], { + 'name': 'analytics-python', + 'version': VERSION + }) + self.assertTrue(isinstance(msg['timestamp'], str)) + self.assertEqual(msg['messageId'], 'messageId') + self.assertEqual(msg['category'], 'category') + self.assertEqual(msg['userId'], 'userId') + self.assertEqual(msg['type'], 'screen') + self.assertEqual(msg['name'], 'name') + + def test_flush(self): + client = self.client + # set up the consumer with more requests than a single batch will allow + for _ in range(1000): + _, _ = client.identify('userId', {'trait': 'value'}) + # We can't reliably assert that the queue is non-empty here; that's + # a race condition. We do our best to load it up though. + client.flush() + # Make sure that the client queue is empty after flushing + self.assertTrue(client.queue.empty()) + + def test_shutdown(self): + client = self.client + # set up the consumer with more requests than a single batch will allow + for _ in range(1000): + _, _ = client.identify('userId', {'trait': 'value'}) + client.shutdown() + # we expect two things after shutdown: + # 1. client queue is empty + # 2. consumer thread has stopped + self.assertTrue(client.queue.empty()) + for consumer in client.consumers: + self.assertFalse(consumer.is_alive()) + + def test_synchronous(self): + client = Client('testsecret', sync_mode=True) + + success, _ = client.identify('userId') + self.assertFalse(client.consumers) + self.assertTrue(client.queue.empty()) + self.assertTrue(success) + + def test_overflow(self): + client = Client('testsecret', max_queue_size=1) + # Ensure consumer thread is no longer uploading + client.join() + + for _ in range(10): + client.identify('userId') + + success, _ = client.identify('userId') + # Make sure we are informed that the queue is at capacity + self.assertFalse(success) + + def test_success_on_invalid_write_key(self): + client = Client('bad_key', on_error=self.fail) + client.track('userId', 'event') + client.flush() + self.assertFalse(self.failed) + + def test_unicode(self): + Client('unicode_key') + + def test_numeric_user_id(self): + self.client.track(1234, 'python event') + self.client.flush() + self.assertFalse(self.failed) + + def test_debug(self): + Client('bad_key', debug=True) + self.client.log.setLevel(0) # reset log level after debug enable + + def test_identify_with_date_object(self): + client = self.client + success, msg = client.identify( + 'userId', + { + 'birthdate': date(1981, 2, 2), + }, + ) + client.flush() + self.assertTrue(success) + self.assertFalse(self.failed) + + self.assertEqual(msg['traits'], {'birthdate': date(1981, 2, 2)}) + + def test_gzip(self): + client = Client('testsecret', on_error=self.fail, gzip=True) + for _ in range(10): + client.identify('userId', {'trait': 'value'}) + client.flush() + self.assertFalse(self.failed) + + def test_user_defined_upload_size(self): + client = Client('testsecret', on_error=self.fail, + upload_size=10, upload_interval=3) + + def mock_post_fn(*args, **kwargs): + self.assertEqual(len(kwargs['batch']), 10) + + # the post function should be called 2 times, with a batch size of 10 + # each time. + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn) \ + as mock_post: + for _ in range(20): + client.identify('userId', {'trait': 'value'}) + time.sleep(1) + self.assertEqual(mock_post.call_count, 2) + + def test_user_defined_timeout(self): + client = Client('testsecret', timeout=10) + for consumer in client.consumers: + self.assertEqual(consumer.timeout, 10) + + def test_default_timeout_15(self): + client = Client('testsecret') + for consumer in client.consumers: + self.assertEqual(consumer.timeout, 15) + + def test_proxies(self): + proxies={'http':'203.243.63.16:80','https':'203.243.63.16:80'} + client = Client('testsecret', proxies=proxies) + def mock_post_fn(*args, **kwargs): + res = mock.Mock() + res.status_code = 200 + res.json.return_value = {'code': 'success', 'message': 'success'} + return res + + with mock.patch('segment.analytics.request._session.post', side_effect=mock_post_fn) as mock_post: + success, msg = client.identify('userId', {'trait': 'value'}) + client.flush() + self.assertTrue(success) + mock_post.assert_called_once() + args, kwargs = mock_post.call_args + self.assertIn('proxies', kwargs) + self.assertEqual(kwargs['proxies'], proxies) \ No newline at end of file diff --git a/segment/analytics/test/test_consumer.py b/segment/analytics/test/test_consumer.py new file mode 100644 index 00000000..83717266 --- /dev/null +++ b/segment/analytics/test/test_consumer.py @@ -0,0 +1,222 @@ +import unittest +import mock +import time +import json + +try: + from queue import Queue +except ImportError: + from Queue import Queue + +from segment.analytics.consumer import Consumer, MAX_MSG_SIZE +from segment.analytics.request import APIError + + +class TestConsumer(unittest.TestCase): + + def test_next(self): + q = Queue() + consumer = Consumer(q, '') + q.put(1) + next = consumer.next() + self.assertEqual(next, [1]) + + def test_next_limit(self): + q = Queue() + upload_size = 50 + consumer = Consumer(q, '', upload_size) + for i in range(10000): + q.put(i) + next = consumer.next() + self.assertEqual(next, list(range(upload_size))) + + def test_dropping_oversize_msg(self): + q = Queue() + consumer = Consumer(q, '') + oversize_msg = {'m': 'x' * MAX_MSG_SIZE} + q.put(oversize_msg) + next = consumer.next() + self.assertEqual(next, []) + self.assertTrue(q.empty()) + + def test_upload(self): + q = Queue() + consumer = Consumer(q, 'testsecret') + track = { + 'type': 'track', + 'event': 'python event', + 'userId': 'userId' + } + q.put(track) + success = consumer.upload() + self.assertTrue(success) + + def test_upload_interval(self): + # Put _n_ items in the queue, pausing a little bit more than + # _upload_interval_ after each one. + # The consumer should upload _n_ times. + q = Queue() + upload_interval = 0.3 + consumer = Consumer(q, 'testsecret', upload_size=10, + upload_interval=upload_interval) + with mock.patch('segment.analytics.consumer.post') as mock_post: + consumer.start() + for i in range(0, 3): + track = { + 'type': 'track', + 'event': 'python event %d' % i, + 'userId': 'userId' + } + q.put(track) + time.sleep(upload_interval * 1.1) + self.assertEqual(mock_post.call_count, 3) + + def test_multiple_uploads_per_interval(self): + # Put _upload_size*2_ items in the queue at once, then pause for + # _upload_interval_. The consumer should upload 2 times. + q = Queue() + upload_interval = 0.5 + upload_size = 10 + consumer = Consumer(q, 'testsecret', upload_size=upload_size, + upload_interval=upload_interval) + with mock.patch('segment.analytics.consumer.post') as mock_post: + consumer.start() + for i in range(0, upload_size * 2): + track = { + 'type': 'track', + 'event': 'python event %d' % i, + 'userId': 'userId' + } + q.put(track) + time.sleep(upload_interval * 1.1) + self.assertEqual(mock_post.call_count, 2) + + @classmethod + def test_request(cls): + consumer = Consumer(None, 'testsecret') + track = { + 'type': 'track', + 'event': 'python event', + 'userId': 'userId' + } + consumer.request([track]) + + def _test_request_retry(self, consumer, + expected_exception, exception_count): + + def mock_post(*args, **kwargs): + mock_post.call_count += 1 + if mock_post.call_count <= exception_count: + raise expected_exception + mock_post.call_count = 0 + + with mock.patch('segment.analytics.consumer.post', + mock.Mock(side_effect=mock_post)): + track = { + 'type': 'track', + 'event': 'python event', + 'userId': 'userId' + } + # request() should succeed if the number of exceptions raised is + # less than the retries parameter. + if exception_count <= consumer.retries: + consumer.request([track]) + else: + # if exceptions are raised more times than the retries + # parameter, we expect the exception to be returned to + # the caller. + try: + consumer.request([track]) + except type(expected_exception) as exc: + self.assertEqual(exc, expected_exception) + else: + self.fail( + "request() should raise an exception if still failing " + "after %d retries" % consumer.retries) + + def test_request_retry(self): + # we should retry on general errors + consumer = Consumer(None, 'testsecret') + self._test_request_retry(consumer, Exception('generic exception'), 2) + + # we should retry on server errors + consumer = Consumer(None, 'testsecret') + self._test_request_retry(consumer, APIError( + 500, 'code', 'Internal Server Error'), 2) + + # we should retry on HTTP 429 errors + consumer = Consumer(None, 'testsecret') + self._test_request_retry(consumer, APIError( + 429, 'code', 'Too Many Requests'), 2) + + # we should NOT retry on other client errors + consumer = Consumer(None, 'testsecret') + api_error = APIError(400, 'code', 'Client Errors') + try: + self._test_request_retry(consumer, api_error, 1) + except APIError: + pass + else: + self.fail('request() should not retry on client errors') + + # test for number of exceptions raise > retries value + consumer = Consumer(None, 'testsecret', retries=3) + self._test_request_retry(consumer, APIError( + 500, 'code', 'Internal Server Error'), 3) + + def test_pause(self): + consumer = Consumer(None, 'testsecret') + consumer.pause() + self.assertFalse(consumer.running) + + def test_max_batch_size(self): + q = Queue() + consumer = Consumer( + q, 'testsecret', upload_size=100000, upload_interval=3) + track = { + 'type': 'track', + 'event': 'python event', + 'userId': 'userId' + } + msg_size = len(json.dumps(track).encode()) + # number of messages in a maximum-size batch + n_msgs = int(475000 / msg_size) + + def mock_post_fn(_, data, **kwargs): + res = mock.Mock() + res.status_code = 200 + self.assertTrue(len(data.encode()) < 500000, + 'batch size (%d) exceeds 500KB limit' + % len(data.encode())) + return res + + with mock.patch('segment.analytics.request._session.post', + side_effect=mock_post_fn) as mock_post: + consumer.start() + for _ in range(0, n_msgs + 2): + q.put(track) + q.join() + self.assertEqual(mock_post.call_count, 2) + + @classmethod + def test_proxies(cls): + proxies = {'http': '203.243.63.16:80', 'https': '203.243.63.16:80'} + consumer = Consumer(None, 'testsecret', proxies=proxies) + track = { + 'type': 'track', + 'event': 'python event', + 'userId': 'userId' + } + + def mock_post_fn(*args, **kwargs): + res = mock.Mock() + res.status_code = 200 + res.json.return_value = {'code': 'success', 'message': 'success'} + return res + + with mock.patch('segment.analytics.request._session.post', side_effect=mock_post_fn) as mock_post: + consumer.request([track]) + mock_post.assert_called_once() + args, kwargs = mock_post.call_args + cls().assertIn('proxies', kwargs) + cls().assertEqual(kwargs['proxies'], proxies) diff --git a/segment/analytics/test/test_init.py b/segment/analytics/test/test_init.py new file mode 100644 index 00000000..98ad6aa3 --- /dev/null +++ b/segment/analytics/test/test_init.py @@ -0,0 +1,87 @@ +import unittest +import pkgutil +import logging +import sys +import segment.analytics as analytics +from segment.analytics.client import Client + + +def all_names(): + for _, modname, _ in pkgutil.iter_modules(__path__): + yield 'segment.analytics.test.' + modname + + +def all(): + logging.basicConfig(stream=sys.stderr) + return unittest.defaultTestLoader.loadTestsFromNames(all_names()) + + +class TestInit(unittest.TestCase): + def test_writeKey(self): + self.assertIsNone(analytics.default_client) + analytics.flush() + self.assertEqual(analytics.default_client.write_key, 'test-init') + + def test_debug(self): + self.assertIsNone(analytics.default_client) + analytics.debug = True + analytics.flush() + self.assertTrue(analytics.default_client.debug) + analytics.default_client = None + analytics.debug = False + analytics.flush() + self.assertFalse(analytics.default_client.debug) + analytics.default_client.log.setLevel(0) # reset log level after debug enable + + def test_gzip(self): + self.assertIsNone(analytics.default_client) + analytics.gzip = True + analytics.flush() + self.assertTrue(analytics.default_client.gzip) + analytics.default_client = None + analytics.gzip = False + analytics.flush() + self.assertFalse(analytics.default_client.gzip) + + def test_host(self): + self.assertIsNone(analytics.default_client) + analytics.host = 'http://test-host' + analytics.flush() + self.assertEqual(analytics.default_client.host, 'http://test-host') + analytics.host = None + analytics.default_client = None + + def test_max_queue_size(self): + self.assertIsNone(analytics.default_client) + analytics.max_queue_size = 1337 + analytics.flush() + self.assertEqual(analytics.default_client.queue.maxsize, 1337) + + def test_max_retries(self): + self.assertIsNone(analytics.default_client) + client = Client('testsecret', max_retries=42) + for consumer in client.consumers: + self.assertEqual(consumer.retries, 42) + + def test_sync_mode(self): + self.assertIsNone(analytics.default_client) + analytics.sync_mode = True + analytics.flush() + self.assertTrue(analytics.default_client.sync_mode) + analytics.default_client = None + analytics.sync_mode = False + analytics.flush() + self.assertFalse(analytics.default_client.sync_mode) + + def test_timeout(self): + self.assertIsNone(analytics.default_client) + analytics.timeout = 1.234 + analytics.flush() + self.assertEqual(analytics.default_client.timeout, 1.234) + + def setUp(self): + analytics.write_key = 'test-init' + analytics.default_client = None + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/segment/analytics/test/test_module.py b/segment/analytics/test/test_module.py new file mode 100644 index 00000000..e5fe598c --- /dev/null +++ b/segment/analytics/test/test_module.py @@ -0,0 +1,49 @@ +import unittest + +import segment.analytics as analytics + + +class TestModule(unittest.TestCase): + + # def failed(self): + # self.failed = True + + def setUp(self): + self.failed = False + analytics.write_key = 'testsecret' + analytics.on_error = self.failed + + def test_no_write_key(self): + analytics.write_key = None + self.assertRaises(Exception, analytics.track) + + def test_no_host(self): + analytics.host = None + self.assertRaises(Exception, analytics.track) + + def test_track(self): + analytics.track('userId', 'python module event') + analytics.flush() + + def test_identify(self): + analytics.identify('userId', {'email': 'user@email.com'}) + analytics.flush() + + def test_group(self): + analytics.group('userId', 'groupId') + analytics.flush() + + def test_alias(self): + analytics.alias('previousId', 'userId') + analytics.flush() + + def test_page(self): + analytics.page('userId') + analytics.flush() + + def test_screen(self): + analytics.screen('userId') + analytics.flush() + + def test_flush(self): + analytics.flush() diff --git a/segment/analytics/test/test_oauth.py b/segment/analytics/test/test_oauth.py new file mode 100644 index 00000000..a2269507 --- /dev/null +++ b/segment/analytics/test/test_oauth.py @@ -0,0 +1,155 @@ +from datetime import datetime +import threading +import time +import unittest +import mock +import sys +import os +sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../..")) +from segment.analytics.client import Client +import segment.analytics.oauth_manager +import requests + +privatekey = '''-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDVll7uJaH322IN +PQsH2aOXZJ2r1q+6hpVK1R5JV1p41PUzn8pOxyXFHWB+53dUd4B8qywKS36XQjp0 +VmhR1tQ22znQ9ZCM6y4LGeOJBjAZiFZLcGQNNrDFC0WGWTrK1ZTS2K7p5qy4fIXG +laNkMXiGGCawkgcHAdOvPTy8m1d9a6YSetYVmBP/tEYN95jPyZFIoHQfkQPBPr9W +cWPpdEBzasHV5d957akjurPpleDiD5as66UW4dkWXvS7Wu7teCLCyDApcyJKTb2Z +SXybmWjhIZuctZMAx3wT/GgW3FbkGaW5KLQgBUMzjpL0fCtMatlqckMD92ll1FuK +R+HnXu05AgMBAAECggEBAK4o2il4GDUh9zbyQo9ZIPLuwT6AZXRED3Igi3ykNQp4 +I6S/s9g+vQaY6LkyBnSiqOt/K/8NBiFSiJWaa5/n+8zrP56qzf6KOlYk+wsdN5Vq +PWtwLrUzljpl8YAWPEFunNa8hwwE42vfZbnDBKNLT4qQIOQzfnVxQOoQlfj49gM2 +iSrblvsnQTyucFy3UyTeioHbh8q2Xqcxry5WUCOrFDd3IIwATTpLZGw0IPeuFJbJ +NfBizLEcyJaM9hujQU8PRCRd16MWX+bbYM6Mh4dkT40QXWsVnHBHwsgPdQgDgseF +Na4ajtHoC0DlwYCXpCm3IzJfKfq/LR2q8NDUgKeF4AECgYEA9nD4czza3SRbzhpZ +bBoK77CSNqCcMAqyuHB0hp/XX3yB7flF9PIPb2ReO8wwmjbxn+bm7PPz2Uwd2SzO +pU+FXmyKJr53Jxw/hmDWZCoh42gsGDlVqpmytzsj74KlaYiMyZmEGbD7t/FGfNGV +LdLDJaHIYxEimFviOTXKCeKvPAECgYEA3d8tv4jdp1uAuRZiU9Z/tfw5mJOi3oXF +8AdFFDwaPzcTorEAxjrt9X6IjPbLIDJNJtuXYpe+dG6720KyuNnhLhWW9oZEJTwT +dUgqZ2fTCOS9uH0jSn+ZFlgTWI6UDQXRwE7z8avlhMIrQVmPsttGTo7V6sQVtGRx +bNj2RSVekTkCgYAJvy4UYLPHS0jWPfSLcfw8vp8JyhBjVgj7gncZW/kIrcP1xYYe +yfQSU8XmV40UjFfCGz/G318lmP0VOdByeVKtCV3talsMEPHyPqI8E+6DL/uOebYJ +qUqINK6XKnOgWOY4kvnGillqTQCcry1XQp61PlDOmj7kB75KxPXYrj6AAQKBgQDa ++ixCv6hURuEyy77cE/YT/Q4zYnL6wHjtP5+UKwWUop1EkwG6o+q7wtiul90+t6ah +1VUCP9X/QFM0Qg32l0PBohlO0pFrVnG17TW8vSHxwyDkds1f97N19BOT8ZR5jebI +sKPfP9LVRnY+l1BWLEilvB+xBzqMwh2YWkIlWI6PMQKBgGi6TBnxp81lOYrxVRDj +/3ycRnVDmBdlQKFunvfzUBmG1mG/G0YHeVSUKZJGX7w2l+jnDwIA383FcUeA8X6A +l9q+amhtkwD/6fbkAu/xoWNl+11IFoxd88y2ByBFoEKB6UVLuCTSKwXDqzEZet7x +mDyRxq7ohIzLkw8b8buDeuXZ +-----END PRIVATE KEY-----''' + +def mocked_requests_get(*args, **kwargs): + class MockResponse: + def __init__(self, data, status_code): + self.__dict__['headers'] = {'date': datetime.now().strftime("%a, %d %b %Y %H:%M:%S GMT")} + self.__dict__.update(data) + self.status_code = status_code + + def json(self): + return self.json_data + if 'url' not in kwargs: + kwargs['url'] = args[0] + if kwargs['url'] == 'http://127.0.0.1:80/token': + return MockResponse({"json_data" : {"access_token": "test_token", "expires_in": 4000}}, 200) + elif kwargs['url'] == 'http://127.0.0.1:400/token': + return MockResponse({"reason": "test_reason", "json_data" : {"error":"unrecoverable", "error_description":"nah"}}, 400) + elif kwargs['url'] == 'http://127.0.0.1:429/token': + return MockResponse({"reason": "test_reason", "headers" : {"X-RateLimit-Reset": 234}}, 429) + elif kwargs['url'] == 'http://127.0.0.1:500/token': + return MockResponse({"reason": "test_reason", "json_data" : {"error":"recoverable", "error_description":"nah"}}, 500) + elif kwargs['url'] == 'http://127.0.0.1:501/token': + if mocked_requests_get.error_count < 0 or mocked_requests_get.error_count > 0: + if mocked_requests_get.error_count > 0: + mocked_requests_get.error_count -= 1 + return MockResponse({"reason": "test_reason", "json_data" : {"error":"recoverable", "message":"nah"}}, 500) + else: # return the number of errors if set above 0 + mocked_requests_get.error_count = -1 + return MockResponse({"json_data" : {"access_token": "test_token", "expires_in": 4000}}, 200) + elif kwargs['url'] == 'https://api.segment.io/v1/batch': + return MockResponse({}, 200) + print("Unhandled mock URL") + return MockResponse({'text':'Unhandled mock URL error'}, 404) +mocked_requests_get.error_count = -1 + +class TestOauthManager(unittest.TestCase): + @mock.patch.object(requests.Session, 'post', side_effect=mocked_requests_get) + def test_oauth_success(self, mock_post): + manager = segment.analytics.oauth_manager.OauthManager("id", privatekey, "keyid", "http://127.0.0.1:80") + self.assertEqual(manager.get_token(), "test_token") + self.assertEqual(manager.max_retries, 3) + self.assertEqual(manager.scope, "tracking_api:write") + self.assertEqual(manager.auth_server, "http://127.0.0.1:80") + self.assertEqual(manager.timeout, 15) + self.assertTrue(manager.thread.is_alive) + + @mock.patch.object(requests.Session, 'post', side_effect=mocked_requests_get) + def test_oauth_fail_unrecoverably(self, mock_post): + manager = segment.analytics.oauth_manager.OauthManager("id", privatekey, "keyid", "http://127.0.0.1:400") + with self.assertRaises(Exception) as context: + manager.get_token() + self.assertTrue(manager.thread.is_alive) + self.assertEqual(mock_post.call_count, 1) + manager.thread.cancel() + + @mock.patch.object(requests.Session, 'post', side_effect=mocked_requests_get) + def test_oauth_fail_with_retries(self, mock_post): + manager = segment.analytics.oauth_manager.OauthManager("id", privatekey, "keyid", "http://127.0.0.1:500") + with self.assertRaises(Exception) as context: + manager.get_token() + self.assertTrue(manager.thread.is_alive) + self.assertEqual(mock_post.call_count, 3) + manager.thread.cancel() + + @mock.patch.object(requests.Session, 'post', side_effect=mocked_requests_get) + @mock.patch('time.sleep', spec=time.sleep) # 429 uses sleep so it won't be interrupted + def test_oauth_rate_limit_delay(self, mock_sleep, mock_post): + manager = segment.analytics.oauth_manager.OauthManager("id", privatekey, "keyid", "http://127.0.0.1:429") + manager._poller_loop() + mock_sleep.assert_called_with(234) + +class TestOauthIntegration(unittest.TestCase): + def fail(self, e, batch=[]): + self.failed = True + + def setUp(self): + self.failed = False + + @mock.patch.object(requests.Session, 'post', side_effect=mocked_requests_get) + def test_oauth_integration_success(self, mock_post): + client = Client("write_key", on_error=self.fail, oauth_auth_server="http://127.0.0.1:80", + oauth_client_id="id",oauth_client_key=privatekey, oauth_key_id="keyid") + client.track("user", "event") + client.flush() + self.assertFalse(self.failed) + self.assertEqual(mock_post.call_count, 2) + + @mock.patch.object(requests.Session, 'post', side_effect=mocked_requests_get) + def test_oauth_integration_failure(self, mock_post): + client = Client("write_key", on_error=self.fail, oauth_auth_server="http://127.0.0.1:400", + oauth_client_id="id",oauth_client_key=privatekey, oauth_key_id="keyid") + client.track("user", "event") + client.flush() + self.assertTrue(self.failed) + self.assertEqual(mock_post.call_count, 1) + + @mock.patch.object(requests.Session, 'post', side_effect=mocked_requests_get) + def test_oauth_integration_recovery(self, mock_post): + mocked_requests_get.error_count = 2 # 2 errors and then success + client = Client("write_key", on_error=self.fail, oauth_auth_server="http://127.0.0.1:501", + oauth_client_id="id",oauth_client_key=privatekey, oauth_key_id="keyid") + client.track("user", "event") + client.flush() + self.assertFalse(self.failed) + self.assertEqual(mock_post.call_count, 4) + + @mock.patch.object(requests.Session, 'post', side_effect=mocked_requests_get) + def test_oauth_integration_fail_bad_key(self, mock_post): + client = Client("write_key", on_error=self.fail, oauth_auth_server="http://127.0.0.1:80", + oauth_client_id="id",oauth_client_key="badkey", oauth_key_id="keyid") + client.track("user", "event") + client.flush() + self.assertTrue(self.failed) + +if __name__ == '__main__': + unittest.main() diff --git a/segment/analytics/test/test_request.py b/segment/analytics/test/test_request.py new file mode 100644 index 00000000..5ffca009 --- /dev/null +++ b/segment/analytics/test/test_request.py @@ -0,0 +1,74 @@ +from datetime import datetime, date +import unittest +import json +import requests +from unittest import mock + +from segment.analytics.request import post, DatetimeSerializer + + +class TestRequests(unittest.TestCase): + + def test_valid_request(self): + res = post('testsecret', batch=[{ + 'userId': 'userId', + 'event': 'python event', + 'type': 'track' + }]) + self.assertEqual(res.status_code, 200) + + def test_invalid_request_error(self): + self.assertRaises(Exception, post, 'testsecret', + 'https://api.segment.io', False, '[{]') + + def test_invalid_host(self): + self.assertRaises(Exception, post, 'testsecret', + 'api.segment.io/', batch=[]) + + def test_datetime_serialization(self): + data = {'created': datetime(2012, 3, 4, 5, 6, 7, 891011)} + result = json.dumps(data, cls=DatetimeSerializer) + self.assertEqual(result, '{"created": "2012-03-04T05:06:07.891011"}') + + def test_date_serialization(self): + today = date.today() + data = {'created': today} + result = json.dumps(data, cls=DatetimeSerializer) + expected = '{"created": "%s"}' % today.isoformat() + self.assertEqual(result, expected) + + def test_should_not_timeout(self): + res = post('testsecret', batch=[{ + 'userId': 'userId', + 'event': 'python event', + 'type': 'track' + }], timeout=15) + self.assertEqual(res.status_code, 200) + + def test_should_timeout(self): + with self.assertRaises(requests.ReadTimeout): + post('testsecret', batch=[{ + 'userId': 'userId', + 'event': 'python event', + 'type': 'track' + }], timeout=0.0001) + + def test_proxies(self): + proxies = {'http': '203.243.63.16:80', 'https': '203.243.63.16:80'} + def mock_post_fn(*args, **kwargs): + res = mock.Mock() + res.status_code = 200 + res.json.return_value = {'code': 'success', 'message': 'success'} + return res + + with mock.patch('segment.analytics.request._session.post', side_effect=mock_post_fn) as mock_post: + res = post('testsecret', proxies= proxies, batch=[{ + 'userId': 'userId', + 'event': 'python event', + 'type': 'track' + }]) + self.assertEqual(res.status_code, 200) + mock_post.assert_called_once() + args, kwargs = mock_post.call_args + self.assertIn('proxies', kwargs) + self.assertEqual(kwargs['proxies'], proxies) diff --git a/segment/analytics/test/test_utils.py b/segment/analytics/test/test_utils.py new file mode 100644 index 00000000..43a6fd4b --- /dev/null +++ b/segment/analytics/test/test_utils.py @@ -0,0 +1,73 @@ +from datetime import date, datetime, timedelta +from decimal import Decimal +import unittest + +from dateutil.tz import tzutc + +from segment.analytics import utils + + +class TestUtils(unittest.TestCase): + + def test_timezone_utils(self): + now = datetime.now() + utcnow = datetime.now(tz=tzutc()) + self.assertTrue(utils.is_naive(now)) + self.assertFalse(utils.is_naive(utcnow)) + + fixed = utils.guess_timezone(now) + self.assertFalse(utils.is_naive(fixed)) + + shouldnt_be_edited = utils.guess_timezone(utcnow) + self.assertEqual(utcnow, shouldnt_be_edited) + + def test_clean(self): + simple = { + 'decimal': Decimal('0.142857'), + 'unicode': 'woo', + 'date': datetime.now(), + 'long': 200000000, + 'integer': 1, + 'float': 2.0, + 'bool': True, + 'str': 'woo', + 'none': None + } + + complicated = { + 'exception': Exception('This should show up'), + 'timedelta': timedelta(microseconds=20), + 'list': [1, 2, 3] + } + + combined = dict(simple.items()) + combined.update(complicated.items()) + + pre_clean_keys = combined.keys() + + utils.clean(combined) + self.assertEqual(combined.keys(), pre_clean_keys) + + def test_clean_with_dates(self): + dict_with_dates = { + 'birthdate': date(1980, 1, 1), + 'registration': datetime.utcnow(), + } + self.assertEqual(dict_with_dates, utils.clean(dict_with_dates)) + + @classmethod + def test_bytes(cls): + item = bytes(10) + utils.clean(item) + + def test_clean_fn(self): + cleaned = utils.clean({'fn': lambda x: x, 'number': 4}) + self.assertEqual(cleaned['number'], 4) + if 'fn' in cleaned: + self.assertEqual(cleaned['fn'], None) + + def test_remove_slash(self): + self.assertEqual('http://segment.io', + utils.remove_trailing_slash('http://segment.io/')) + self.assertEqual('http://segment.io', + utils.remove_trailing_slash('http://segment.io')) diff --git a/segment/analytics/utils.py b/segment/analytics/utils.py new file mode 100644 index 00000000..b51ff6b3 --- /dev/null +++ b/segment/analytics/utils.py @@ -0,0 +1,88 @@ +from enum import Enum +import logging +import numbers + +from decimal import Decimal +from datetime import date, datetime +from dateutil.tz import tzlocal, tzutc + +log = logging.getLogger('segment') + + +def is_naive(dt): + """Determines if a given datetime.datetime is naive.""" + return dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None + + +def total_seconds(delta): + """Determines total seconds with python < 2.7 compat.""" + # http://stackoverflow.com/questions/3694835/python-2-6-5-divide-timedelta-with-timedelta + return (delta.microseconds + + (delta.seconds + delta.days * 24 * 3600) * 1e6) / 1e6 + + +def guess_timezone(dt): + """Attempts to convert a naive datetime to an aware datetime.""" + if is_naive(dt): + # attempts to guess the datetime.datetime.now() local timezone + # case, and then defaults to utc + delta = datetime.now() - dt + if total_seconds(delta) < 5: + # this was created using datetime.datetime.now() + # so we are in the local timezone + return dt.replace(tzinfo=tzlocal()) + # at this point, the best we can do is guess UTC + return dt.replace(tzinfo=tzutc()) + + return dt + + +def remove_trailing_slash(host): + if host.endswith('/'): + return host[:-1] + return host + + +def clean(item): + if isinstance(item, Decimal): + return float(item) + elif isinstance(item, (str, bool, numbers.Number, datetime, + date, type(None))): + return item + elif isinstance(item, (set, list, tuple)): + return _clean_list(item) + elif isinstance(item, dict): + return _clean_dict(item) + elif isinstance(item, Enum): + return clean(item.value) + else: + return _coerce_unicode(item) + + +def _clean_list(list_): + return [clean(item) for item in list_] + + +def _clean_dict(dict_): + data = {} + for k, v in dict_.items(): + try: + data[k] = clean(v) + except TypeError: + log.warning( + 'Dictionary values must be serializeable to ' + 'JSON "%s" value %s of type %s is unsupported.', + k, v, type(v), + ) + return data + + +def _coerce_unicode(cmplx): + try: + item = cmplx.decode("utf-8", "strict") + except AttributeError as exception: + item = ":".join(exception) + item.decode("utf-8", "strict") + log.warning('Error decoding: %s', item) + return None + return item diff --git a/segment/analytics/version.py b/segment/analytics/version.py new file mode 100644 index 00000000..b4ab188b --- /dev/null +++ b/segment/analytics/version.py @@ -0,0 +1 @@ +VERSION = '2.3.5' diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 00000000..2a9acf13 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[bdist_wheel] +universal = 1 diff --git a/setup.py b/setup.py index 25b455d2..0443d7b2 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,3 @@ - import os import sys @@ -6,35 +5,59 @@ from setuptools import setup except ImportError: from distutils.core import setup - -# Don't import stripe module here, since deps may not be installed -sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'analytics')) +# Don't import analytics-python module here, since deps may not be installed +sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'segment','analytics')) from version import VERSION long_description = ''' -Segment.io is the simplest way to integrate analytics into your application. +Segment is the simplest way to integrate analytics into your application. One API allows you to turn on any other analytics service. No more learning new APIs, repeated code, and wasted development time. -This is the official python client that wraps the Segment.io REST API (https://segment.io). +This is the official python client that wraps the Segment REST API (https://segment.com). Documentation and more details at https://github.com/segmentio/analytics-python ''' +install_requires = [ + "requests~=2.7", + "backoff~=2.1", + "python-dateutil~=2.2", + "PyJWT~=2.10.1" +] + +tests_require = [ + "mock==2.0.0", + "pylint==3.3.1", + "flake8==3.7.9", +] + setup( - name='analytics-python', + name='segment-analytics-python', version=VERSION, url='https://github.com/segmentio/analytics-python', - author='Ilya Volodarsky', - author_email='ilya@segment.io', - maintainer='Segment.io', - maintainer_email='friends@segment.io', - packages=['analytics'], + author='Segment', + author_email='friends@segment.com', + maintainer='Segment', + maintainer_email='friends@segment.com', + test_suite='segment.analytics.test.all', + packages=['segment.analytics', 'segment.analytics.test'], + python_requires='>=3.9.0', license='MIT License', - install_requires=[ - 'requests', - 'python-dateutil' - ], + install_requires=install_requires, + extras_require={ + 'test': tests_require + }, description='The hassle-free way to integrate analytics into any python application.', - long_description=long_description + long_description=long_description, + classifiers=[ + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + ], ) diff --git a/simulator.py b/simulator.py new file mode 100644 index 00000000..df83de67 --- /dev/null +++ b/simulator.py @@ -0,0 +1,99 @@ +import logging +import argparse +import json +import segment.analytics as analytics + +__name__ = 'simulator.py' +__version__ = '0.0.1' +__description__ = 'scripting simulator' + + +def json_hash(str): + if str: + return json.loads(str) + +# analytics -method= -segment-write-key= [options] + + +parser = argparse.ArgumentParser(description='send a segment message') + +parser.add_argument('--writeKey', help='the Segment writeKey') +parser.add_argument('--type', help='The Segment message type') + +parser.add_argument('--userId', help='the user id to send the event as') +parser.add_argument( + '--anonymousId', help='the anonymous user id to send the event as') +parser.add_argument( + '--context', help='additional context for the event (JSON-encoded)') + +parser.add_argument('--event', help='the event name to send with the event') +parser.add_argument( + '--properties', help='the event properties to send (JSON-encoded)') + +parser.add_argument( + '--name', help='name of the screen or page to send with the message') + +parser.add_argument( + '--traits', help='the identify/group traits to send (JSON-encoded)') + +parser.add_argument('--groupId', help='the group id') + +options = parser.parse_args() + + +def failed(status, msg): + raise Exception(msg) + + +def track(): + analytics.track(options.userId, options.event, anonymous_id=options.anonymousId, + properties=json_hash(options.properties), context=json_hash(options.context)) + + +def page(): + analytics.page(options.userId, name=options.name, anonymous_id=options.anonymousId, + properties=json_hash(options.properties), context=json_hash(options.context)) + + +def screen(): + analytics.screen(options.userId, name=options.name, anonymous_id=options.anonymousId, + properties=json_hash(options.properties), context=json_hash(options.context)) + + +def identify(): + analytics.identify(options.userId, anonymous_id=options.anonymousId, + traits=json_hash(options.traits), context=json_hash(options.context)) + + +def group(): + analytics.group(options.userId, options.groupId, json_hash(options.traits), + json_hash(options.context), anonymous_id=options.anonymousId) + + +def unknown(): + print() + + +analytics.write_key = options.writeKey +analytics.on_error = failed +analytics.debug = True + +log = logging.getLogger('segment') +ch = logging.StreamHandler() +ch.setLevel(logging.DEBUG) +log.addHandler(ch) + +switcher = { + "track": track, + "page": page, + "screen": screen, + "identify": identify, + "group": group +} + +func = switcher.get(options.type) +if func: + func() + analytics.shutdown() +else: + print("Invalid Message Type " + options.type) diff --git a/test.py b/test.py deleted file mode 100644 index 57e837ef..00000000 --- a/test.py +++ /dev/null @@ -1,295 +0,0 @@ -import unittest - -from datetime import datetime, timedelta - -from random import randint -from time import sleep, time -from decimal import * - -import logging -logging.basicConfig() - -from dateutil.tz import tzutc - -import analytics -import analytics.utils - -secret = 'testsecret' - - -def on_success(data, response): - print 'Success', response - - -def on_failure(data, error): - print 'Failure', error - - -class AnalyticsBasicTests(unittest.TestCase): - - def setUp(self): - analytics.init(secret, log_level=logging.DEBUG) - - analytics.on_success(on_success) - analytics.on_failure(on_failure) - - def test_timezone_utils(self): - - now = datetime.now() - utcnow = datetime.now(tz=tzutc()) - - self.assertTrue(analytics.utils.is_naive(now)) - self.assertFalse(analytics.utils.is_naive(utcnow)) - - fixed = analytics.utils.guess_timezone(now) - - self.assertFalse(analytics.utils.is_naive(fixed)) - - shouldnt_be_edited = analytics.utils.guess_timezone(utcnow) - - self.assertEqual(utcnow, shouldnt_be_edited) - - def test_clean(self): - - simple = { - 'integer': 1, - 'float': 2.0, - 'long': 200000000, - 'bool': True, - 'str': 'woo', - 'unicode': u'woo', - 'decimal': Decimal('0.142857'), - 'date': datetime.now(), - } - - complicated = { - 'exception': Exception('This should show up'), - 'timedelta': timedelta(microseconds=20), - 'list': [1, 2, 3] - } - - combined = dict(simple.items() + complicated.items()) - - pre_clean_keys = combined.keys() - - analytics.default_client._clean(combined) - - self.assertEqual(combined.keys(), pre_clean_keys) - - def test_async_basic_identify(self): - # flush after every message - analytics.default_client.flush_at = 1 - analytics.default_client.async = True - - last_identifies = analytics.stats.identifies - last_successful = analytics.stats.successful - last_flushes = analytics.stats.flushes - - analytics.identify('ilya@analytics.io', { - "Subscription Plan": "Free", - "Friends": 30 - }) - - self.assertEqual(analytics.stats.identifies, last_identifies + 1) - - # this should flush because we set the flush_at to 1 - self.assertEqual(analytics.stats.flushes, last_flushes + 1) - - # this should do nothing, as the async thread is currently active - analytics.flush() - - # we should see no more flushes here - self.assertEqual(analytics.stats.flushes, last_flushes + 1) - - sleep(1) - - self.assertEqual(analytics.stats.successful, last_successful + 1) - - def test_async_basic_track(self): - - analytics.default_client.flush_at = 50 - analytics.default_client.async = True - - last_tracks = analytics.stats.tracks - last_successful = analytics.stats.successful - - analytics.track('ilya@analytics.io', 'Played a Song', { - "Artist": "The Beatles", - "Song": "Eleanor Rigby" - }) - - self.assertEqual(analytics.stats.tracks, last_tracks + 1) - - analytics.flush() - - sleep(2) - - self.assertEqual(analytics.stats.successful, last_successful + 1) - - def test_async_full_identify(self): - - analytics.default_client.flush_at = 1 - analytics.default_client.async = True - - last_identifies = analytics.stats.identifies - last_successful = analytics.stats.successful - - traits = { - "Subscription Plan": "Free", - "Friends": 30 - } - - context = { - "ip": "12.31.42.111", - "location": { - "countryCode": "US", - "region": "CA" - }, - "userAgent": ("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) " + - "AppleWebKit/534.53.11 (KHTML, like Gecko) Version/5.1.3 " + - "Safari/534.53.10"), - "language": "en-us" - } - - analytics.identify('ilya@analytics.io', traits, - context=context, timestamp=datetime.now()) - - self.assertEqual(analytics.stats.identifies, last_identifies + 1) - - sleep(2) - - self.assertEqual(analytics.stats.successful, last_successful + 1) - - def test_async_full_track(self): - - analytics.default_client.flush_at = 1 - analytics.default_client.async = True - - last_tracks = analytics.stats.tracks - last_successful = analytics.stats.successful - - properties = { - "Artist": "The Beatles", - "Song": "Eleanor Rigby" - } - - analytics.track('ilya@analytics.io', 'Played a Song', - properties, timestamp=datetime.now()) - - self.assertEqual(analytics.stats.tracks, last_tracks + 1) - - sleep(1) - - self.assertEqual(analytics.stats.successful, last_successful + 1) - - def test_alias(self): - - session_id = str(randint(1000000, 99999999)) - user_id = 'bob+'+session_id + '@gmail.com' - - analytics.default_client.flush_at = 1 - analytics.default_client.async = False - - last_aliases = analytics.stats.aliases - last_successful = analytics.stats.successful - - analytics.identify(session_id, traits={'AnonymousTrait': 'Who am I?'}) - analytics.track(session_id, 'Anonymous Event') - - # alias the user - analytics.alias(session_id, user_id) - - analytics.identify(user_id, traits={'IdentifiedTrait': 'A Hunk'}) - analytics.track(user_id, 'Identified Event') - - self.assertEqual(analytics.stats.aliases, last_aliases + 1) - self.assertEqual(analytics.stats.successful, last_successful + 5) - - def test_blocking_flush(self): - - analytics.default_client.flush_at = 1 - analytics.default_client.async = False - - last_tracks = analytics.stats.tracks - last_successful = analytics.stats.successful - - properties = { - "Artist": "The Beatles", - "Song": "Eleanor Rigby" - } - - analytics.track('ilya@analytics.io', 'Played a Song', - properties, timestamp=datetime.today()) - - self.assertEqual(analytics.stats.tracks, last_tracks + 1) - self.assertEqual(analytics.stats.successful, last_successful + 1) - - def test_time_policy(self): - - analytics.default_client.async = False - analytics.default_client.flush_at = 1 - - # add something so we have a reason to flush - analytics.track('ilya@analytics.io', 'Played a Song', { - "Artist": "The Beatles", - "Song": "Eleanor Rigby" - }) - - # flush to reset flush count - analytics.flush() - - last_flushes = analytics.stats.flushes - - # set the flush size trigger high - analytics.default_client.flush_at = 50 - # set the time policy to 1 second from now - analytics.default_client.flush_after = timedelta(seconds=1) - - analytics.track('ilya@analytics.io', 'Played a Song', { - "Artist": "The Beatles", - "Song": "Eleanor Rigby" - }) - - # that shouldn't of triggered a flush - self.assertEqual(analytics.stats.flushes, last_flushes) - - # sleep past the time-flush policy - sleep(1.2) - - # submit another track to trigger the policy - analytics.track('ilya@analytics.io', 'Played a Song', { - "Artist": "The Beatles", - "Song": "Eleanor Rigby" - }) - - self.assertEqual(analytics.stats.flushes, last_flushes + 1) - - def test_performance(self): - - to_send = 100 - - target = analytics.stats.successful + to_send - - analytics.default_client.async = True - analytics.default_client.flush_at = 200 - analytics.default_client.max_flush_size = 50 - analytics.default_client.set_log_level(logging.DEBUG) - - for i in range(to_send): - analytics.track('ilya@analytics.io', 'Played a Song', { - "Artist": "The Beatles", - "Song": "Eleanor Rigby" - }) - - print 'Finished submitting into the queue' - - start = time() - while analytics.stats.successful < target: - print ('Successful ', analytics.stats.successful, 'Left', - (target - analytics.stats.successful), - 'Duration ', (time() - start)) - analytics.flush() - sleep(1.0) - -if __name__ == '__main__': - unittest.main()